diff --git a/src/messaging.rs b/src/messaging.rs index b351a12..46812ce 100644 --- a/src/messaging.rs +++ b/src/messaging.rs @@ -1,7 +1,9 @@ //! The local message delivery system. use std::collections::{HashSet, HashMap, VecDeque}; use std::fmt::Debug; +use std::mem; use std::net::SocketAddr; +use std::rc::Rc; use crossbeam::{Scope, ScopedJoinHandle}; use crossbeam_channel; use crossbeam_channel::{bounded, unbounded, Sender, Receiver}; @@ -40,22 +42,47 @@ pub enum AlgoMessage { } /// A message sent between algorithm instances. -pub struct RoutedMessage { - /// Identifier of the algorithm that sent the message. - src: Algorithm, +pub struct LocalMessage { + // /// Identifier of the algorithm that sent the message. + // src: Algorithm, /// Identifier of the message destination algorithm. dst: Algorithm, /// Payload message: AlgoMessage } +/// The message destinations corresponding to a remote node `i`. It can be +/// either of the two: +/// +/// 1) `All`: all nodes if sent to socket tasks, or all local algorithm +/// instances if received from socket tasks. +/// +/// 2) `Node(i)`: node `i` or local algorithm instances with the node index `i`. +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum RemoteNode { + All, + Node(SocketAddr) +} + +/// Message with a designated target. +#[derive(Clone, Debug, PartialEq)] +pub struct RemoteMessage { + pub node: RemoteNode, + pub message: Message +} + +pub enum QMessage { + Local(LocalMessage), + Remote(RemoteMessage) +} + /// States of the message loop consided as an automaton with output. There is /// one exit state `Finished` and one transitional (also initial) state /// `Processing` whose argument is a queue of messages that are output in order /// to be sent to remote nodes. #[derive(Clone, PartialEq)] pub enum MessageLoopState { - Processing(VecDeque>), + Processing(VecDeque), Finished } @@ -68,32 +95,55 @@ impl MessageLoopState { false } } + + /// Appends pending messages of another state. Used to append messages + /// emitted by the handler to the messages already queued from previous + /// iterations of a message handling loop. + pub fn append(&mut self, other: &mut MessageLoopState) { + if let MessageLoopState::Processing(ref mut new_msgs) = other + { + if let MessageLoopState::Processing(ref mut msgs) = self + { + msgs.append(new_msgs); + } + } + } } /// The queue functionality for messages sent between algorithm instances. pub struct MessageQueue { - algos: HashMap + /// Algorithm message handlers. Every message handler receives a message and + /// the TX handle of the incoming message queue. + algos: HashMap) -> Result>>, - /// The queue of local messages. - queue: VecDeque, - /// TX handles to remote nodes. - remote_txs: HashMap>>, + /// The queue of local and remote messages. + /// TODO: Arc(Mutex(_)) for pushing from socket threads. + queue: VecDeque, + /// TX handle of the message queue. + queue_tx: Sender, + /// RX handle of the message queue. + queue_rx: Receiver, + /// Remote send function + send_remote: fn(&VecDeque) } impl MessageQueue { - pub fn new(remote_txs: HashMap>>) -> Self + pub fn new(send_remote: fn(&VecDeque)) -> + Self { + let (queue_tx, queue_rx) = unbounded(); MessageQueue { algos: HashMap::new(), queue: VecDeque::new(), - remote_txs + queue_tx, + queue_rx, + send_remote } } /// Registers a handler for messages sent to the given algorithm. pub fn insert_algo(&mut self, algo: Algorithm, - handler: Box + handler: Box) -> Result>) { let _ = self.algos.insert(algo, handler).unwrap(); @@ -106,76 +156,87 @@ impl MessageQueue { /// Places a message at the end of the queue for routing to the destination /// later. - pub fn push(&mut self, message: RoutedMessage) { + /// + /// FIXME: Thread-safe interface. + pub fn push(&mut self, message: QMessage) { self.queue.push_back(message); } /// Removes and returns the message from the front of the queue if the queue /// is not empty. - pub fn pop(&mut self) -> Option { - self.queue.pop_front() - } + // pub fn pop(&mut self) -> Option { + // self.queue.pop_front() + // } - /// Message delivery routine. - pub fn deliver(&mut self) -> Result { - let mut result = Ok(MessageLoopState::Processing(VecDeque::new())); - let mut queue_empty = false; - - while !queue_empty && result.is_ok() { - if let Some(RoutedMessage { - src: ref _src, - ref dst, - ref message - }) = self.pop() { - if let Some(handler) = self.algos.get(dst) { - result = handler(message).map_err(Error::from) - .map(|new_state| { - if let MessageLoopState::Processing(mut new_msgs) - = new_state.to_owned() - { - if let Ok(MessageLoopState::Processing(mut msgs)) - = result - { - // Append the messages to remote nodes - // emitted by the handler to the messages - // already queued. - msgs.append(&mut new_msgs); - MessageLoopState::Processing( - msgs - ) - } - else { - new_state - } - } - else { - new_state - } - }); - } - else { - result = Err(Error::NoSuchAlgorithm); - } - } - else { - queue_empty = true; - } - } - result - } - - /// Send a message to a remote node. - pub fn send_remote(&mut self, - addr: &SocketAddr, - message: Message) -> - Result<(), Error> + /// The message loop. + pub fn message_loop(&mut self) -> Result { - if let Some(tx) = self.remote_txs.get(addr) { - tx.send(message).map_err(Error::from) - } - else { - Err(Error::NoSuchRemote) - } + let mut result = Ok(MessageLoopState::Processing(VecDeque::new())); + + while let Ok(mut state) = result { + // Send any outgoing messages to remote nodes using the provided + // function. + if let MessageLoopState::Processing(messages) = state { + // TODO: error handling + (self.send_remote)(&messages); + state = MessageLoopState::Processing(VecDeque::new()) + } + + // Receive local and remote messages. + if let Ok(m) = self.queue_rx.recv() { match m { + QMessage::Local(LocalMessage { + dst, + message + }) => { + result = if let Some(handler) = self.algos.get(&dst) { + let mut new_result = handler(&QMessage::Local( + LocalMessage { + dst, + message + }), self.queue_tx.clone() + ).map_err(Error::from); + if let Ok(ref mut new_state) = new_result { + state.append(new_state); + Ok(state) + } + else { + // Error overrides the previous state. + new_result + } + } + else { + Err(Error::NoSuchAlgorithm) + } + } + + QMessage::Remote(RemoteMessage { + node, + message + }) => { + // Multicast the message to all algorithm instances, + // collecting output messages iteratively and appending them + // to result. + result = self.algos.iter() + .fold(Ok(state), + |result1, (_, handler)| { + if let Ok(mut state1) = result1 { + handler(&QMessage::Remote(RemoteMessage { + node: node.clone(), + message: message.clone() + }), self.queue_tx.clone() + ).map(|ref mut state2| { + state1.append(state2); + state1 + }).map_err(Error::from) + } + else { + result1 + } + } + ) + } + }} else { result = Err(Error::RecvError) }} // end of while loop + result } } @@ -413,6 +474,7 @@ pub trait AlgoError { pub enum Error { NoSuchAlgorithm, NoSuchRemote, + RecvError, AlgoError(&'static str), NoSuchTarget, SendError, diff --git a/tests/broadcast.rs b/tests/broadcast.rs index a2be193..c89c019 100644 --- a/tests/broadcast.rs +++ b/tests/broadcast.rs @@ -11,213 +11,79 @@ extern crate merkle; mod netsim; -use std::sync::Arc; -use std::collections::{HashSet, HashMap, VecDeque}; +use std::collections::{BTreeMap, HashSet, HashMap, VecDeque}; use std::fmt; use std::fmt::Debug; use std::io; +use std::net::SocketAddr; +use std::rc::Rc; use crossbeam::{Scope, ScopedJoinHandle}; use crossbeam_channel::{bounded, Sender, Receiver}; use hbbft::proto::*; use hbbft::messaging; use hbbft::messaging::{AlgoError, Algorithm, ProposedValue, AlgoMessage, - MessageLoopState, MessageQueue, - Messaging, SourcedMessage}; + MessageLoopState, MessageQueue, RemoteMessage}; use hbbft::broadcast; use netsim::NetSim; /// This is a structure to start a consensus node. -pub struct TestNode<'a> { +#[derive(Debug)] +pub struct TestNode { /// Node identifier. node_index: usize, /// Total number of nodes. num_nodes: usize, - /// TX handles, one for each other node. - txs: Vec<&'a Sender>>>, - /// RX handle, one for each other node. - rxs: Vec<&'a Receiver>>>, + /// TX handles indexed with the receiving node address. One handle for each + /// other node. + txs: HashMap>>>, + /// RX handle indexed with the transmitting node address. One handle for + /// each other node. + rxs: HashMap>>>, /// Optionally, a value to be broadcast by this node. - value: Option + value: Option } -impl<'a> TestNode<'a> +impl TestNode { /// Consensus node constructor. It only initialises initial parameters. pub fn new(node_index: usize, num_nodes: usize, - txs: Vec<&'a Sender>>>, - rxs: Vec<&'a Receiver>>>, - value: Option) -> Self + txs: HashMap>>>, + rxs: HashMap>>>, + value: Option) -> Self { TestNode { - node_index: node_index, - num_nodes: num_nodes, - txs: txs, - rxs: rxs, - value: value + node_index, + num_nodes, + txs, + rxs, + value } } - pub fn handle(&self, message: &AlgoMessage) -> - Result + pub fn run(&self) -> + Result, Error> { - Err(TestAlgoError::TestError) + let mut stop = false; + + // FIXME: localise to the Node context. + // let f: fn(&VecDeque) = self.send_remote; + let mut mq: MessageQueue = MessageQueue::new( + send_remote + ); + + Err(Error::NotImplemented) } - pub fn run(&self, messaging: Messaging>) -> - Result, Error> - { - assert_eq!(self.rxs.len(), self.num_nodes - 1); - - let to_comms_rxs = messaging.to_comms_rxs(); - let from_comms_tx = messaging.from_comms_tx(); - let to_algo_rxs = messaging.to_algo_rxs(); - let from_algo_tx = messaging.from_algo_tx(); - let ref to_algo_rx0 = to_algo_rxs[0]; - let value = self.value.to_owned(); - let num_nodes = self.num_nodes; - let mut values = HashSet::new(); - - crossbeam::scope(|scope| { - let mut handles = Vec::new(); - - // Spawn the 0-th instance corresponding to this node. The return - // value shall be equal to `value` if computation succeeded or error - // otherwise. - handles.push(scope.spawn(move || { - broadcast::Instance::new(from_algo_tx, - to_algo_rx0, - value, - num_nodes, - 0) - .run() - })); - - // Control TX handles to stop all comms threads. - let mut comms_stop_txs = Vec::new(); - - // Spawn instances 1 through num_nodes-1 together with simulated - // remote comms tasks. - for i in 1..num_nodes { - // Make a channel to be used to stop the comms task. - let (comms_stop_tx, comms_stop_rx): (Sender<()>, Receiver<()>) = - bounded(1); - // Record the TX handle for using it later. - comms_stop_txs.push(comms_stop_tx); - // Spawn the comms task. - scope.spawn(move || { - // Termination condition variable. - let mut stop = false; - - // Receive messages from the simulated node or locally. - while !stop { select_loop! { - // Receive from the simulated remote node. - recv(self.rxs[i-1], message) => { - debug!("Node {}/{} received {:?}", - self.node_index, i, message); - from_comms_tx.send( - SourcedMessage { - source: i, - message - }).map_err(|e| { - error!("{}", e); - }).unwrap(); - }, - // Receive from an algorithm via local - // messaging. Forward the message to the simulated - // remote node. - recv(to_comms_rxs[i-1], message) => { - self.txs[i-1].send(message).map_err(|e| { - error!("{}", e); - }).unwrap(); - } - recv(comms_stop_rx, _) => { - debug!("Stopping comms task {}/{}", - self.node_index, i); - stop = true; - } - }} - }); - - let ref to_algo_rx = to_algo_rxs[i]; - - // Spawn a broadcast instance associated with the above comms - // task. - handles.push(scope.spawn(move || { - broadcast::Instance::new(from_algo_tx, - to_algo_rx, - None, - num_nodes, - i) - .run() - })); - } - - // Collect the values computed by broadcast instances. - let final_result = handles.into_iter().fold(Ok(()), |result, h| { - if result.is_ok() { - match h.join() { - Ok(v) => { - debug!("Received value {:?}", v); - values.insert(v); - Ok(()) - }, - Err(e) => Err(Error::Broadcast(e)) - } - } - else { - result - } - }).and_then(|_| Ok(values)); - - // Stop the comms tasks. - for tx in comms_stop_txs { - tx.send(()).map_err(|e| { - error!("{}", e); - }).unwrap(); - } - - // NEW START - let mut stop = false; - let mut mq: MessageQueue = MessageQueue::new( - HashMap::new() // FIXME: TX handles to comms tasks - ); - // Set the initial state of the message loop to Processing. - let mut loop_result = Ok(MessageLoopState::Processing( - VecDeque::new() - )); - - while !stop { - match loop_result { - Ok(MessageLoopState::Processing(msgs)) => { - // Send messages to remote nodes. FIXME. - - // Deliver all messages locally. Message handlers queue - // local messages and output messages to remote nodes. - loop_result = mq.deliver(); - } - Ok(MessageLoopState::Finished) => { - stop = true; - } - Err(ref e) => { - error!("Error: {:?}", e); - stop = true; - } - } - } - // NEW END - - final_result - }) + fn send_remote(&self, messages: &VecDeque) { + // FIXME } +} - pub fn handle_message(&self, m: messaging::RoutedMessage) -> - Result<(), Error> - { - Ok(()) - } +fn send_remote(messages: &VecDeque) { + // FIXME } #[derive(Clone, Debug)] @@ -230,69 +96,30 @@ impl From for Error { fn from(e: broadcast::Error) -> Error { Error::Broadcast(e) } } -#[derive(Clone, Hash, PartialEq, Eq)] -pub struct TestValue { - pub value: String -} - -impl Debug for TestValue { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{}", self.value)?; - Ok(()) - } -} - -/// `TestValue: merkle::Hashable` is derived from `TestValue: AsRef<[u8]>`. -impl AsRef<[u8]> for TestValue { - fn as_ref(&self) -> &[u8] { - self.value.as_ref() - } -} - -impl From> for TestValue { - fn from(bytes: Vec) -> TestValue { - TestValue { - value: String::from_utf8(bytes).expect("Found invalid UTF-8") - // conversion from UTF-8 often panics: - // String::from_utf8(bytes).expect("Found invalid UTF-8") - } - } -} - -impl From for Vec { - fn from(v: TestValue) -> Vec { - match v { - TestValue { value } => { - value.as_bytes().to_vec() - } - } - } -} - -fn test_value_fmt(n: usize) -> TestValue { - TestValue { - value: format!("-{}-{}-{}-", n, n, n) - } +fn proposed_value(n: usize) -> ProposedValue { + let b: u8 = (n & 0xff) as u8; + vec![b; 10] } /// Creates a vector of test nodes but does not run them. -fn create_test_nodes<'a>(num_nodes: usize, - net: &'a NetSim>>) -> - Vec> +fn create_test_nodes(num_nodes: usize, + net: &NetSim>>) -> + Vec { let mut nodes = Vec::new(); for n in 0..num_nodes { - let value = test_value_fmt(n); - let mut txs = Vec::new(); - let mut rxs = Vec::new(); + let value = proposed_value(n); + let mut txs = HashMap::new(); + let mut rxs = HashMap::new(); // Set up comms channels to other nodes. for m in 0..num_nodes { if n == m { // Skip the channel back to the node itself. continue; } - txs.push(net.tx(n, m)); - rxs.push(net.rx(m, n)); + let addr = format!("127.0.0.1:{}", m).parse().unwrap(); + txs.insert(addr, net.tx(n, m)); + rxs.insert(addr, net.rx(m, n)); } nodes.push(TestNode::new(n, num_nodes, txs, rxs, Some(value))); } @@ -319,53 +146,11 @@ fn test_4_broadcast_nodes() { let nodes = create_test_nodes(NUM_NODES, &net); crossbeam::scope(|scope| { - - let mut handles = Vec::new(); - let mut messaging_stop_txs = Vec::new(); - let mut msg_handles = Vec::new(); - for node in nodes { - // Start a local messaging service on the simulated node. - let messaging: Messaging> = - Messaging::new(NUM_NODES); - // Take the handle to receive the result after the thread finishes. - msg_handles.push(messaging.spawn(scope)); - // Take the thread control handle. - messaging_stop_txs.push(messaging.stop_tx()); - - handles.push(scope.spawn(move || { - node.run(messaging) - })); - } - - // Compare the set of values returned by broadcast against the expected - // set. - for h in handles { - match h.join() { - Err(Error::NotImplemented) => panic!(), - Err(err) => panic!("Error: {:?}", err), - Ok(v) => { - panic!("End of test"); - let mut expected = HashSet::new(); - for n in 0..NUM_NODES { - expected.insert(test_value_fmt(n)); - } - debug!("Finished with values {:?}", v); - assert_eq!(v, expected); - }, - } - } - // Stop all messaging tasks. - for tx in messaging_stop_txs { - tx.send(()).map_err(|e| { - error!("{}", e); - }).unwrap(); - } - for (i, h) in msg_handles.into_iter().enumerate() { - match h.join() { - Ok(()) => debug!("Messaging[{}] stopped OK", i), - Err(e) => debug!("Messaging[{}] error: {:?}", i, e) - } + scope.spawn(move || { + debug!("Running {:?}", node); + node.run().unwrap(); + }); } }); } diff --git a/tests/netsim.rs b/tests/netsim.rs index 137af72..f101884 100644 --- a/tests/netsim.rs +++ b/tests/netsim.rs @@ -38,18 +38,18 @@ impl NetSim { } /// The TX side of a channel from node `src` to node `dst`. - pub fn tx(&self, src: usize, dst: usize) -> &Sender { + pub fn tx(&self, src: usize, dst: usize) -> Sender { assert!(src < self.num_nodes); assert!(dst < self.num_nodes); - &self.txs[src * self.num_nodes + dst] + self.txs[src * self.num_nodes + dst].clone() } /// The RX side of a channel from node `src` to node `dst`. - pub fn rx(&self, src: usize, dst: usize) -> &Receiver { + pub fn rx(&self, src: usize, dst: usize) -> Receiver { assert!(src < self.num_nodes); assert!(dst < self.num_nodes); - &self.rxs[src * self.num_nodes + dst] + self.rxs[src * self.num_nodes + dst].clone() } }