From d9bc81fe5fa3e5140ff538ffbc1f3f8dd4386a82 Mon Sep 17 00:00:00 2001 From: Vladimir Komendantskiy Date: Fri, 13 Apr 2018 18:28:41 +0100 Subject: [PATCH] integration test of broadcast mostly ready; there is a thread termination issue however --- src/broadcast/mod.rs | 22 ++--- src/commst.rs | 4 +- src/lib.rs | 5 +- src/messaging.rs | 51 ++++++++---- src/node.rs | 7 +- src/stream_io.rs | 21 ----- tests/broadcast.rs | 192 +++++++++++++++++++++++++++++++++++-------- tests/node_comms.rs | 92 +++++++++++++++++++++ 8 files changed, 306 insertions(+), 88 deletions(-) delete mode 100644 src/stream_io.rs create mode 100644 tests/node_comms.rs diff --git a/src/broadcast/mod.rs b/src/broadcast/mod.rs index 2892b13..e010b8b 100644 --- a/src/broadcast/mod.rs +++ b/src/broadcast/mod.rs @@ -37,8 +37,8 @@ pub struct Instance<'a, T: 'a + Clone + Debug + Send + Sync> { num_faulty_nodes: usize } -impl<'a, T: Clone + Debug + Hashable + Send + Sync + Into> - + From>> +impl<'a, T: Clone + Debug + Hashable + Send + Sync + + Into> + From>> Instance<'a, T> { pub fn new(tx: &'a Sender>, @@ -96,17 +96,21 @@ pub enum Error { Recv(RecvError) } -impl From for Error { +impl + From for Error +{ fn from(err: rse::Error) -> Error { Error::ReedSolomon(err) } } -impl From>> - for Error +impl + From>> for Error { fn from(err: SendError>) -> Error { Error::Send(err) } } -impl From for Error { +impl + From for Error +{ fn from(err: RecvError) -> Error { Error::Recv(err) } } @@ -119,8 +123,7 @@ fn send_shards<'a, T>(value: T, tx: &'a Sender>, coding: &ReedSolomon) -> Result, Error> -where T: Clone + Debug + Hashable + Send + Sync + Into> - + From> +where T: Clone + Debug + Hashable + Send + Sync + Into> + From> { let data_shard_num = coding.data_shard_count(); let parity_shard_num = coding.parity_shard_count(); @@ -206,8 +209,7 @@ fn inner_run<'a, T>(tx: &'a Sender>, num_nodes: usize, num_faulty_nodes: usize) -> Result> -where T: Clone + Debug + Hashable + Send + Sync + Into> - + From> +where T: Clone + Debug + Hashable + Send + Sync + Into> + From> { // Erasure coding scheme: N - 2f value shards and 2f parity shards let parity_shard_num = 2 * num_faulty_nodes; diff --git a/src/commst.rs b/src/commst.rs index 42cc171..4035fc5 100644 --- a/src/commst.rs +++ b/src/commst.rs @@ -12,7 +12,6 @@ use proto::Message; use proto_io; use proto_io::ProtoIo; use messaging::SourcedMessage; -use stream_io::StreamIo; #[derive(Debug)] pub enum Error { @@ -38,8 +37,7 @@ pub struct CommsTask pub node_index: usize } -impl - <'a, T: 'a + Clone + Debug + Send + Sync + From> + Into>> +impl <'a, T: 'a + Clone + Debug + Send + Sync + From> + Into>> CommsTask<'a, T> { pub fn new(tx: &'a Sender>, diff --git a/src/lib.rs b/src/lib.rs index 50d528f..febb611 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -50,11 +50,10 @@ extern crate reed_solomon_erasure; mod connection; pub mod messaging; -mod stream_io; pub mod proto; mod proto_io; mod commst; -mod broadcast; -mod agreement; +pub mod broadcast; +pub mod agreement; pub mod node; diff --git a/src/messaging.rs b/src/messaging.rs index dbbde93..379e073 100644 --- a/src/messaging.rs +++ b/src/messaging.rs @@ -1,7 +1,7 @@ //! The local message delivery system. use std::fmt::Debug; -use crossbeam::Scope; -use crossbeam_channel::{unbounded, Sender, Receiver}; +use crossbeam::{Scope, ScopedJoinHandle}; +use crossbeam_channel::{bounded, unbounded, Sender, Receiver}; use proto::Message; /// Message destination can be either of the two: @@ -23,7 +23,8 @@ pub struct TargetedMessage { pub message: Message } -impl TargetedMessage { +impl TargetedMessage +{ /// Initialises a message while checking parameter preconditions. pub fn new(target: Target, message: Message) -> Option { match target { @@ -70,6 +71,10 @@ pub struct Messaging { to_algo_rxs: Vec>>, /// TX handle to be used by algo tasks. from_algo_tx: Sender>, + + /// Control channel used to stop the listening thread. + stop_tx: Sender<()>, + stop_rx: Receiver<()>, } impl Messaging { @@ -101,20 +106,25 @@ impl Messaging { .collect(); let (from_algo_tx, from_algo_rx) = unbounded(); + let (stop_tx, stop_rx) = bounded(1); + Messaging { - num_nodes: num_nodes, + num_nodes, // internally used handles - to_comms_txs: to_comms_txs, - from_comms_rx: from_comms_rx, - to_algo_txs: to_algo_txs, - from_algo_rx: from_algo_rx, + to_comms_txs, + from_comms_rx, + to_algo_txs, + from_algo_rx, // externally used handles - to_comms_rxs: to_comms_rxs, - from_comms_tx: from_comms_tx, - to_algo_rxs: to_algo_rxs, - from_algo_tx: from_algo_tx, + to_comms_rxs, + from_comms_tx, + to_algo_rxs, + from_algo_tx, + + stop_tx, + stop_rx, } } @@ -138,8 +148,13 @@ impl Messaging { &self.from_algo_tx } + /// Gives the ownership of the handle to stop the message receive loop. + pub fn stop_tx(&self) -> Sender<()> { + self.stop_tx.to_owned() + } + /// Spawns the message delivery thread in a given thread scope. - pub fn spawn<'a>(&self, scope: &Scope<'a>) + pub fn spawn<'a>(&self, scope: &Scope<'a>) -> ScopedJoinHandle<()> where T: 'a { let to_comms_txs = self.to_comms_txs.to_owned(); @@ -147,9 +162,12 @@ impl Messaging { let to_algo_txs = self.to_algo_txs.to_owned(); let from_algo_rx = self.from_algo_rx.to_owned(); + let stop_rx = self.stop_rx.to_owned(); + let mut stop = false; + scope.spawn(move || { // This loop forwards messages according to their metadata. - loop { select_loop! { + while !stop { select_loop! { recv(from_algo_rx, message) => { match message { TargetedMessage { @@ -183,8 +201,11 @@ impl Messaging { for tx in to_algo_txs.iter() { tx.send(message.clone()).unwrap(); } + }, + recv(stop_rx, _) => { + stop = true; } }} // end of select_loop! - }); + }) } } diff --git a/src/node.rs b/src/node.rs index 6209e15..4acb365 100644 --- a/src/node.rs +++ b/src/node.rs @@ -37,7 +37,8 @@ pub struct Node { value: Option } -impl> + Into>> +impl> + Into>> Node { /// Consensus node constructor. It only initialises initial parameters. @@ -61,6 +62,7 @@ impl> + Into>> let from_comms_tx = messaging.from_comms_tx(); let to_algo_rxs = messaging.to_algo_rxs(); let from_algo_tx = messaging.from_algo_tx(); + let stop_tx = messaging.stop_tx(); // All spawned threads will have exited by the end of the scope. crossbeam::scope(|scope| { @@ -132,6 +134,9 @@ impl> + Into>> }); } + // Stop the messaging task. + stop_tx.send(()).unwrap(); + // TODO: continue the implementation of the asynchronous common // subset algorithm. Err(Error::NotImplemented) diff --git a/src/stream_io.rs b/src/stream_io.rs deleted file mode 100644 index 76f5c1d..0000000 --- a/src/stream_io.rs +++ /dev/null @@ -1,21 +0,0 @@ -//! Abstract interface to serialised IO. - -use std::io; -use std::io::{Read, Write}; - -use proto::Message; // FIXME: Message should be made independent of the - // protobuf type MessageProto. - -/// Trait of types of streams carrying payload of type `Message` and -/// returning errors of type `Error`. -/// -/// This is a stream interface independent of the choice of serialisation -/// methods. -pub trait StreamIo -where Stream: Read + Write, T: Send + Sync -{ - fn from_stream(stream: Stream) -> Self; - fn try_clone(&self) -> Result where Self: Sized; - fn recv(&mut self) -> Result, Error>; - fn send(&mut self, m: Message) -> Result<(), Error>; -} diff --git a/tests/broadcast.rs b/tests/broadcast.rs index 25a2ed1..6bfd841 100644 --- a/tests/broadcast.rs +++ b/tests/broadcast.rs @@ -1,29 +1,40 @@ //! Integration test of the reliable broadcast protocol. extern crate hbbft; +#[macro_use] +extern crate log; +extern crate simple_logger; extern crate crossbeam; #[macro_use] extern crate crossbeam_channel; extern crate merkle; mod netsim; +mod node_comms; +use std::sync::Arc; use std::collections::HashSet; use std::fmt::Debug; use std::io; -use crossbeam_channel::{Sender, Receiver}; +use crossbeam::{Scope, ScopedJoinHandle}; +use crossbeam_channel::{bounded, Sender, Receiver}; use hbbft::proto::*; +use hbbft::messaging::{Messaging, SourcedMessage}; +use hbbft::broadcast; use netsim::NetSim; +use node_comms::CommsTask; /// This is a structure to start a consensus node. pub struct TestNode<'a> { - /// Node identifier - ident: usize, - /// TX handles, one for each other node + /// Node identifier. + node_index: usize, + /// Total number of nodes. + num_nodes: usize, + /// TX handles, one for each other node. txs: Vec<&'a Sender>>, - /// RX handle, one for each other node + /// RX handle, one for each other node. rxs: Vec<&'a Receiver>>, /// Optionally, a value to be broadcast by this node. value: Option @@ -32,52 +43,143 @@ pub struct TestNode<'a> { impl<'a> TestNode<'a> { /// Consensus node constructor. It only initialises initial parameters. - pub fn new(ident: usize, + pub fn new(node_index: usize, + num_nodes: usize, txs: Vec<&'a Sender>>, rxs: Vec<&'a Receiver>>, value: Option) -> Self { TestNode { - ident: ident, + node_index: node_index, + num_nodes: num_nodes, txs: txs, rxs: rxs, value: value } } - pub fn run(&self) -> Result, Error> { + pub fn run(&self, messaging: Messaging) -> + Result, Error> + { assert_eq!(self.rxs.len(), 3); - let mut result = None; - for n in 0..3 { - self.txs[n].send(Message::Broadcast( - BroadcastMessage::Ready(Vec::new())) - ).unwrap(); - } - while result.is_none() { - select_loop! { - recv(self.rxs[0], message) => { - println!("Node {}/0 received {:?}", self.ident, message); - result = Some(Err(Error::NotImplemented)); - } - recv(self.rxs[1], message) => { - println!("Node {}/1 received {:?}", self.ident, message); - result = Some(Err(Error::NotImplemented)); - } - recv(self.rxs[2], message) => { - println!("Node {}/2 received {:?}", self.ident, message); - result = Some(Err(Error::NotImplemented)); - } + + let to_comms_rxs = messaging.to_comms_rxs(); + let from_comms_tx = messaging.from_comms_tx(); + let to_algo_rxs = messaging.to_algo_rxs(); + let from_algo_tx = messaging.from_algo_tx(); + let ref to_algo_rx0 = to_algo_rxs[0]; + let value = self.value.to_owned(); + let num_nodes = self.num_nodes; + let mut values = HashSet::new(); + + crossbeam::scope(|scope| { + let mut handles = Vec::new(); + + // Spawn the 0-th instance corresponding to this node. The return + // value shall be equal to `value` if computation succeeded or error + // otherwise. + handles.push(scope.spawn(move || { + broadcast::Instance::new(from_algo_tx, + to_algo_rx0, + value, + num_nodes, + 0) + .run() + })); + + // Control TX handles to stop all comms threads. + let mut comms_stop_txs = Vec::new(); + + // Spawn instances 1 through num_nodes-1 together with simulated + // remote comms tasks. + for i in 1..num_nodes { + // Make a channel to be used to stop the comms task. + let (comms_stop_tx, comms_stop_rx): (Sender<()>, Receiver<()>) = + bounded(1); + // Record the TX handle for using it later. + comms_stop_txs.push(comms_stop_tx); + // Spawn the comms task. + scope.spawn(move || { + // Termination condition variable. + let mut stop = false; + + // Receive messages from the simulated node or locally. + while !stop { select_loop! { + // Receive from the simulated remote node. + recv(self.rxs[i-1], message) => { + debug!("Node {}/{} received {:?}", + self.node_index, i, message); + from_comms_tx.send( + SourcedMessage { + source: i, + message + }).unwrap(); + }, + // Receive from an algorithm via local + // messaging. Forward the message to the simulated + // remote node. + recv(to_comms_rxs[i-1], message) => { + self.txs[i-1].send(message).unwrap(); + } + recv(comms_stop_rx, _) => { + stop = true; + } + }} + }); + + let ref to_algo_rx = to_algo_rxs[i]; + + // Spawn a broadcast instance associated with the above comms + // task. + handles.push(scope.spawn(move || { + broadcast::Instance::new(from_algo_tx, + to_algo_rx, + None, + num_nodes, + i) + .run() + })); } - } - result.unwrap() + + let mut error = None; + + // Collect the values computed by broadcast instances. + for h in handles { + match h.join() { + Ok(v) => { + values.insert(v); + }, + Err(e) => { + error = Some(Error::Broadcast(e)); + } + }; + } + + // Stop the comms tasks. + for tx in comms_stop_txs { + tx.send(()).unwrap(); + } + + if error.is_some() { + Err(error.unwrap()) + } + else { + Ok(values) + } + }) } } -#[derive(Debug, PartialEq)] -pub enum Error { +#[derive(Clone, Debug)] +pub enum Error { + Broadcast(broadcast::Error), NotImplemented } +impl From> for Error { + fn from(e: broadcast::Error) -> Error { Error::Broadcast(e) } +} + #[derive(Clone, Debug, Hash, PartialEq, Eq)] pub struct TestValue { pub value: String @@ -108,6 +210,7 @@ impl From for Vec { } } +/// Creates a vector of test nodes but does not run them. fn create_test_nodes<'a>(num_nodes: usize, net: &'a NetSim>) -> Vec> @@ -119,6 +222,7 @@ fn create_test_nodes<'a>(num_nodes: usize, }; let mut txs = Vec::new(); let mut rxs = Vec::new(); + // Set up comms channels to other nodes. for m in 0..num_nodes { if n == m { // Skip the channel back to the node itself. @@ -127,30 +231,48 @@ fn create_test_nodes<'a>(num_nodes: usize, txs.push(net.tx(n, m)); rxs.push(net.rx(m, n)); } - nodes.push(TestNode::new(n, txs, rxs, Some(value))); + nodes.push(TestNode::new(n, num_nodes, txs, rxs, Some(value))); } nodes } #[test] fn test_4_broadcast_nodes() { + simple_logger::init_with_level(log::Level::Debug).unwrap(); + const NUM_NODES: usize = 4; let net: NetSim> = NetSim::new(NUM_NODES); let nodes = create_test_nodes(NUM_NODES, &net); crossbeam::scope(|scope| { + let mut handles = Vec::new(); + let mut messaging_stop_txs = Vec::new(); for node in nodes { + // Start a local messaging service on the simulated node. + let messaging: Messaging = + Messaging::new(NUM_NODES); + messaging.spawn(scope); + // Take the thread control handle. + messaging_stop_txs.push(messaging.stop_tx()); + handles.push(scope.spawn(move || { - node.run() + node.run(messaging) })); } // Compare the set of values returned by broadcast against the expected // set. for h in handles { - assert_eq!(h.join(), Err(Error::NotImplemented)); + assert!(match h.join() { + Err(Error::NotImplemented) => true, + _ => false + }); + } + // Stop all messaging tasks. + for tx in messaging_stop_txs { + tx.send(()).unwrap(); } }); } diff --git a/tests/node_comms.rs b/tests/node_comms.rs new file mode 100644 index 0000000..d1f3393 --- /dev/null +++ b/tests/node_comms.rs @@ -0,0 +1,92 @@ +//! Simulated comms task structure. A simulated comms task communicates with a +//! simulated remote node through a channel. Local communication with +//! coordinating threads is also made via a channel. + +extern crate hbbft; +extern crate crossbeam; +extern crate crossbeam_channel; + +use std::io; +use std::fmt::Debug; +use std::sync::Arc; +use crossbeam::{Scope, ScopedJoinHandle}; +use crossbeam_channel::{Sender, Receiver}; + +use hbbft::proto::Message; +use hbbft::messaging::SourcedMessage; + +#[derive(Debug)] +pub enum Error { + IoError(io::Error), +} + +impl From for Error { + fn from(err: io::Error) -> Error { Error::IoError(err) } +} + +/// A communication task connects a remote node to the thread that manages the +/// consensus algorithm. +pub struct CommsTask + <'a, T: 'a + Clone + Debug + Send + Sync + From> + Into>> +{ + /// The transmit side of the multiple producer channel from comms threads. + tx: &'a Sender>, + /// The receive side of the channel to the comms thread. + rx: &'a Receiver>, + /// TX to the remote node. + remote_tx: &'a Sender>, + /// RX from the remote node. + remote_rx: &'a Receiver>, + /// The index of this comms task for identification against its remote node. + pub node_index: usize +} + +impl <'a, T: 'a + Clone + Debug + Send + Sync + From> + Into>> + CommsTask<'a, T> +{ + pub fn new(tx: &'a Sender>, + rx: &'a Receiver>, + remote_tx: &'a Sender>, + remote_rx: &'a Receiver>, + node_index: usize) -> + Self + { + CommsTask { + tx: tx, + rx: rx, + remote_tx: remote_tx, + remote_rx: remote_rx, + node_index: node_index + } + } + + /// The main socket IO loop and an asynchronous thread responding to manager + /// thread requests. + pub fn spawn(&mut self, scope: &Scope<'a>) -> ScopedJoinHandle<()> { + // Borrow parts of `self` before entering the thread binding scope. + let tx = Arc::new(self.tx); + let rx = Arc::new(self.rx); + let remote_tx = Arc::new(self.remote_tx); + let remote_rx = Arc::new(self.remote_rx); + let node_index = self.node_index; + + scope.spawn(move || { + // FIXME: refactor to a while loop with clean termination + loop { select_loop! { + recv(rx, message) => { + println!("Node {} <- {:?}", node_index, message); + // Forward the message to the remote node. + remote_tx.send(message).unwrap(); + }, + recv(remote_rx, message) => { + println!("Node {} -> {:?}", node_index, message); + tx.send( + SourcedMessage { + source: node_index, + message + }).unwrap(); + } + }} + }) + } +}