wrote essentials of the message loop and updated the message queue with required handles

This commit is contained in:
Vladimir Komendantskiy 2018-04-21 01:29:54 +01:00
parent 875f3cecbe
commit b5e47d9a4f
3 changed files with 196 additions and 349 deletions

View File

@ -1,7 +1,9 @@
//! The local message delivery system.
use std::collections::{HashSet, HashMap, VecDeque};
use std::fmt::Debug;
use std::mem;
use std::net::SocketAddr;
use std::rc::Rc;
use crossbeam::{Scope, ScopedJoinHandle};
use crossbeam_channel;
use crossbeam_channel::{bounded, unbounded, Sender, Receiver};
@ -40,22 +42,47 @@ pub enum AlgoMessage {
}
/// A message sent between algorithm instances.
pub struct RoutedMessage {
/// Identifier of the algorithm that sent the message.
src: Algorithm,
pub struct LocalMessage {
// /// Identifier of the algorithm that sent the message.
// src: Algorithm,
/// Identifier of the message destination algorithm.
dst: Algorithm,
/// Payload
message: AlgoMessage
}
/// The message destinations corresponding to a remote node `i`. It can be
/// either of the two:
///
/// 1) `All`: all nodes if sent to socket tasks, or all local algorithm
/// instances if received from socket tasks.
///
/// 2) `Node(i)`: node `i` or local algorithm instances with the node index `i`.
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum RemoteNode {
All,
Node(SocketAddr)
}
/// Message with a designated target.
#[derive(Clone, Debug, PartialEq)]
pub struct RemoteMessage {
pub node: RemoteNode,
pub message: Message<ProposedValue>
}
pub enum QMessage {
Local(LocalMessage),
Remote(RemoteMessage)
}
/// States of the message loop consided as an automaton with output. There is
/// one exit state `Finished` and one transitional (also initial) state
/// `Processing` whose argument is a queue of messages that are output in order
/// to be sent to remote nodes.
#[derive(Clone, PartialEq)]
pub enum MessageLoopState {
Processing(VecDeque<TargetedMessage<ProposedValue>>),
Processing(VecDeque<RemoteMessage>),
Finished
}
@ -68,32 +95,55 @@ impl MessageLoopState {
false
}
}
/// Appends pending messages of another state. Used to append messages
/// emitted by the handler to the messages already queued from previous
/// iterations of a message handling loop.
pub fn append(&mut self, other: &mut MessageLoopState) {
if let MessageLoopState::Processing(ref mut new_msgs) = other
{
if let MessageLoopState::Processing(ref mut msgs) = self
{
msgs.append(new_msgs);
}
}
}
}
/// The queue functionality for messages sent between algorithm instances.
pub struct MessageQueue<HandlerError: AlgoError> {
algos: HashMap<Algorithm, Box<Fn(&AlgoMessage) ->
/// Algorithm message handlers. Every message handler receives a message and
/// the TX handle of the incoming message queue.
algos: HashMap<Algorithm, Box<Fn(&QMessage, Sender<QMessage>) ->
Result<MessageLoopState, HandlerError>>>,
/// The queue of local messages.
queue: VecDeque<RoutedMessage>,
/// TX handles to remote nodes.
remote_txs: HashMap<SocketAddr, Sender<Message<ProposedValue>>>,
/// The queue of local and remote messages.
/// TODO: Arc(Mutex(_)) for pushing from socket threads.
queue: VecDeque<QMessage>,
/// TX handle of the message queue.
queue_tx: Sender<QMessage>,
/// RX handle of the message queue.
queue_rx: Receiver<QMessage>,
/// Remote send function
send_remote: fn(&VecDeque<RemoteMessage>)
}
impl<HandlerError: AlgoError> MessageQueue<HandlerError> {
pub fn new(remote_txs: HashMap<SocketAddr,
Sender<Message<ProposedValue>>>) -> Self
pub fn new(send_remote: fn(&VecDeque<RemoteMessage>)) ->
Self
{
let (queue_tx, queue_rx) = unbounded();
MessageQueue {
algos: HashMap::new(),
queue: VecDeque::new(),
remote_txs
queue_tx,
queue_rx,
send_remote
}
}
/// Registers a handler for messages sent to the given algorithm.
pub fn insert_algo(&mut self, algo: Algorithm,
handler: Box<Fn(&AlgoMessage) ->
handler: Box<Fn(&QMessage, Sender<QMessage>) ->
Result<MessageLoopState, HandlerError>>)
{
let _ = self.algos.insert(algo, handler).unwrap();
@ -106,76 +156,87 @@ impl<HandlerError: AlgoError> MessageQueue<HandlerError> {
/// Places a message at the end of the queue for routing to the destination
/// later.
pub fn push(&mut self, message: RoutedMessage) {
///
/// FIXME: Thread-safe interface.
pub fn push(&mut self, message: QMessage) {
self.queue.push_back(message);
}
/// Removes and returns the message from the front of the queue if the queue
/// is not empty.
pub fn pop(&mut self) -> Option<RoutedMessage> {
self.queue.pop_front()
}
// pub fn pop(&mut self) -> Option<QMessage> {
// self.queue.pop_front()
// }
/// Message delivery routine.
pub fn deliver(&mut self) -> Result<MessageLoopState, Error> {
let mut result = Ok(MessageLoopState::Processing(VecDeque::new()));
let mut queue_empty = false;
while !queue_empty && result.is_ok() {
if let Some(RoutedMessage {
src: ref _src,
ref dst,
ref message
}) = self.pop() {
if let Some(handler) = self.algos.get(dst) {
result = handler(message).map_err(Error::from)
.map(|new_state| {
if let MessageLoopState::Processing(mut new_msgs)
= new_state.to_owned()
{
if let Ok(MessageLoopState::Processing(mut msgs))
= result
{
// Append the messages to remote nodes
// emitted by the handler to the messages
// already queued.
msgs.append(&mut new_msgs);
MessageLoopState::Processing(
msgs
)
}
else {
new_state
}
}
else {
new_state
}
});
}
else {
result = Err(Error::NoSuchAlgorithm);
}
}
else {
queue_empty = true;
}
}
result
}
/// Send a message to a remote node.
pub fn send_remote(&mut self,
addr: &SocketAddr,
message: Message<ProposedValue>) ->
Result<(), Error>
/// The message loop.
pub fn message_loop(&mut self) -> Result<MessageLoopState, Error>
{
if let Some(tx) = self.remote_txs.get(addr) {
tx.send(message).map_err(Error::from)
}
else {
Err(Error::NoSuchRemote)
}
let mut result = Ok(MessageLoopState::Processing(VecDeque::new()));
while let Ok(mut state) = result {
// Send any outgoing messages to remote nodes using the provided
// function.
if let MessageLoopState::Processing(messages) = state {
// TODO: error handling
(self.send_remote)(&messages);
state = MessageLoopState::Processing(VecDeque::new())
}
// Receive local and remote messages.
if let Ok(m) = self.queue_rx.recv() { match m {
QMessage::Local(LocalMessage {
dst,
message
}) => {
result = if let Some(handler) = self.algos.get(&dst) {
let mut new_result = handler(&QMessage::Local(
LocalMessage {
dst,
message
}), self.queue_tx.clone()
).map_err(Error::from);
if let Ok(ref mut new_state) = new_result {
state.append(new_state);
Ok(state)
}
else {
// Error overrides the previous state.
new_result
}
}
else {
Err(Error::NoSuchAlgorithm)
}
}
QMessage::Remote(RemoteMessage {
node,
message
}) => {
// Multicast the message to all algorithm instances,
// collecting output messages iteratively and appending them
// to result.
result = self.algos.iter()
.fold(Ok(state),
|result1, (_, handler)| {
if let Ok(mut state1) = result1 {
handler(&QMessage::Remote(RemoteMessage {
node: node.clone(),
message: message.clone()
}), self.queue_tx.clone()
).map(|ref mut state2| {
state1.append(state2);
state1
}).map_err(Error::from)
}
else {
result1
}
}
)
}
}} else { result = Err(Error::RecvError) }} // end of while loop
result
}
}
@ -413,6 +474,7 @@ pub trait AlgoError {
pub enum Error {
NoSuchAlgorithm,
NoSuchRemote,
RecvError,
AlgoError(&'static str),
NoSuchTarget,
SendError,

View File

@ -11,213 +11,79 @@ extern crate merkle;
mod netsim;
use std::sync::Arc;
use std::collections::{HashSet, HashMap, VecDeque};
use std::collections::{BTreeMap, HashSet, HashMap, VecDeque};
use std::fmt;
use std::fmt::Debug;
use std::io;
use std::net::SocketAddr;
use std::rc::Rc;
use crossbeam::{Scope, ScopedJoinHandle};
use crossbeam_channel::{bounded, Sender, Receiver};
use hbbft::proto::*;
use hbbft::messaging;
use hbbft::messaging::{AlgoError, Algorithm, ProposedValue, AlgoMessage,
MessageLoopState, MessageQueue,
Messaging, SourcedMessage};
MessageLoopState, MessageQueue, RemoteMessage};
use hbbft::broadcast;
use netsim::NetSim;
/// This is a structure to start a consensus node.
pub struct TestNode<'a> {
#[derive(Debug)]
pub struct TestNode {
/// Node identifier.
node_index: usize,
/// Total number of nodes.
num_nodes: usize,
/// TX handles, one for each other node.
txs: Vec<&'a Sender<Message<Vec<u8>>>>,
/// RX handle, one for each other node.
rxs: Vec<&'a Receiver<Message<Vec<u8>>>>,
/// TX handles indexed with the receiving node address. One handle for each
/// other node.
txs: HashMap<SocketAddr, Sender<Message<Vec<u8>>>>,
/// RX handle indexed with the transmitting node address. One handle for
/// each other node.
rxs: HashMap<SocketAddr, Receiver<Message<Vec<u8>>>>,
/// Optionally, a value to be broadcast by this node.
value: Option<TestValue>
value: Option<ProposedValue>
}
impl<'a> TestNode<'a>
impl TestNode
{
/// Consensus node constructor. It only initialises initial parameters.
pub fn new(node_index: usize,
num_nodes: usize,
txs: Vec<&'a Sender<Message<Vec<u8>>>>,
rxs: Vec<&'a Receiver<Message<Vec<u8>>>>,
value: Option<TestValue>) -> Self
txs: HashMap<SocketAddr, Sender<Message<Vec<u8>>>>,
rxs: HashMap<SocketAddr, Receiver<Message<Vec<u8>>>>,
value: Option<ProposedValue>) -> Self
{
TestNode {
node_index: node_index,
num_nodes: num_nodes,
txs: txs,
rxs: rxs,
value: value
node_index,
num_nodes,
txs,
rxs,
value
}
}
pub fn handle(&self, message: &AlgoMessage) ->
Result<MessageLoopState, TestAlgoError>
pub fn run(&self) ->
Result<HashSet<ProposedValue>, Error>
{
Err(TestAlgoError::TestError)
let mut stop = false;
// FIXME: localise to the Node context.
// let f: fn(&VecDeque<RemoteMessage>) = self.send_remote;
let mut mq: MessageQueue<TestAlgoError> = MessageQueue::new(
send_remote
);
Err(Error::NotImplemented)
}
pub fn run(&self, messaging: Messaging<Vec<u8>>) ->
Result<HashSet<TestValue>, Error>
{
assert_eq!(self.rxs.len(), self.num_nodes - 1);
let to_comms_rxs = messaging.to_comms_rxs();
let from_comms_tx = messaging.from_comms_tx();
let to_algo_rxs = messaging.to_algo_rxs();
let from_algo_tx = messaging.from_algo_tx();
let ref to_algo_rx0 = to_algo_rxs[0];
let value = self.value.to_owned();
let num_nodes = self.num_nodes;
let mut values = HashSet::new();
crossbeam::scope(|scope| {
let mut handles = Vec::new();
// Spawn the 0-th instance corresponding to this node. The return
// value shall be equal to `value` if computation succeeded or error
// otherwise.
handles.push(scope.spawn(move || {
broadcast::Instance::new(from_algo_tx,
to_algo_rx0,
value,
num_nodes,
0)
.run()
}));
// Control TX handles to stop all comms threads.
let mut comms_stop_txs = Vec::new();
// Spawn instances 1 through num_nodes-1 together with simulated
// remote comms tasks.
for i in 1..num_nodes {
// Make a channel to be used to stop the comms task.
let (comms_stop_tx, comms_stop_rx): (Sender<()>, Receiver<()>) =
bounded(1);
// Record the TX handle for using it later.
comms_stop_txs.push(comms_stop_tx);
// Spawn the comms task.
scope.spawn(move || {
// Termination condition variable.
let mut stop = false;
// Receive messages from the simulated node or locally.
while !stop { select_loop! {
// Receive from the simulated remote node.
recv(self.rxs[i-1], message) => {
debug!("Node {}/{} received {:?}",
self.node_index, i, message);
from_comms_tx.send(
SourcedMessage {
source: i,
message
}).map_err(|e| {
error!("{}", e);
}).unwrap();
},
// Receive from an algorithm via local
// messaging. Forward the message to the simulated
// remote node.
recv(to_comms_rxs[i-1], message) => {
self.txs[i-1].send(message).map_err(|e| {
error!("{}", e);
}).unwrap();
}
recv(comms_stop_rx, _) => {
debug!("Stopping comms task {}/{}",
self.node_index, i);
stop = true;
}
}}
});
let ref to_algo_rx = to_algo_rxs[i];
// Spawn a broadcast instance associated with the above comms
// task.
handles.push(scope.spawn(move || {
broadcast::Instance::new(from_algo_tx,
to_algo_rx,
None,
num_nodes,
i)
.run()
}));
}
// Collect the values computed by broadcast instances.
let final_result = handles.into_iter().fold(Ok(()), |result, h| {
if result.is_ok() {
match h.join() {
Ok(v) => {
debug!("Received value {:?}", v);
values.insert(v);
Ok(())
},
Err(e) => Err(Error::Broadcast(e))
}
}
else {
result
}
}).and_then(|_| Ok(values));
// Stop the comms tasks.
for tx in comms_stop_txs {
tx.send(()).map_err(|e| {
error!("{}", e);
}).unwrap();
}
// NEW START
let mut stop = false;
let mut mq: MessageQueue<TestAlgoError> = MessageQueue::new(
HashMap::new() // FIXME: TX handles to comms tasks
);
// Set the initial state of the message loop to Processing.
let mut loop_result = Ok(MessageLoopState::Processing(
VecDeque::new()
));
while !stop {
match loop_result {
Ok(MessageLoopState::Processing(msgs)) => {
// Send messages to remote nodes. FIXME.
// Deliver all messages locally. Message handlers queue
// local messages and output messages to remote nodes.
loop_result = mq.deliver();
}
Ok(MessageLoopState::Finished) => {
stop = true;
}
Err(ref e) => {
error!("Error: {:?}", e);
stop = true;
}
}
}
// NEW END
final_result
})
fn send_remote(&self, messages: &VecDeque<RemoteMessage>) {
// FIXME
}
}
pub fn handle_message(&self, m: messaging::RoutedMessage) ->
Result<(), Error>
{
Ok(())
}
fn send_remote(messages: &VecDeque<RemoteMessage>) {
// FIXME
}
#[derive(Clone, Debug)]
@ -230,69 +96,30 @@ impl From<broadcast::Error> for Error {
fn from(e: broadcast::Error) -> Error { Error::Broadcast(e) }
}
#[derive(Clone, Hash, PartialEq, Eq)]
pub struct TestValue {
pub value: String
}
impl Debug for TestValue {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.value)?;
Ok(())
}
}
/// `TestValue: merkle::Hashable` is derived from `TestValue: AsRef<[u8]>`.
impl AsRef<[u8]> for TestValue {
fn as_ref(&self) -> &[u8] {
self.value.as_ref()
}
}
impl From<Vec<u8>> for TestValue {
fn from(bytes: Vec<u8>) -> TestValue {
TestValue {
value: String::from_utf8(bytes).expect("Found invalid UTF-8")
// conversion from UTF-8 often panics:
// String::from_utf8(bytes).expect("Found invalid UTF-8")
}
}
}
impl From<TestValue> for Vec<u8> {
fn from(v: TestValue) -> Vec<u8> {
match v {
TestValue { value } => {
value.as_bytes().to_vec()
}
}
}
}
fn test_value_fmt(n: usize) -> TestValue {
TestValue {
value: format!("-{}-{}-{}-", n, n, n)
}
fn proposed_value(n: usize) -> ProposedValue {
let b: u8 = (n & 0xff) as u8;
vec![b; 10]
}
/// Creates a vector of test nodes but does not run them.
fn create_test_nodes<'a>(num_nodes: usize,
net: &'a NetSim<Message<Vec<u8>>>) ->
Vec<TestNode<'a>>
fn create_test_nodes(num_nodes: usize,
net: &NetSim<Message<Vec<u8>>>) ->
Vec<TestNode>
{
let mut nodes = Vec::new();
for n in 0..num_nodes {
let value = test_value_fmt(n);
let mut txs = Vec::new();
let mut rxs = Vec::new();
let value = proposed_value(n);
let mut txs = HashMap::new();
let mut rxs = HashMap::new();
// Set up comms channels to other nodes.
for m in 0..num_nodes {
if n == m {
// Skip the channel back to the node itself.
continue;
}
txs.push(net.tx(n, m));
rxs.push(net.rx(m, n));
let addr = format!("127.0.0.1:{}", m).parse().unwrap();
txs.insert(addr, net.tx(n, m));
rxs.insert(addr, net.rx(m, n));
}
nodes.push(TestNode::new(n, num_nodes, txs, rxs, Some(value)));
}
@ -319,53 +146,11 @@ fn test_4_broadcast_nodes() {
let nodes = create_test_nodes(NUM_NODES, &net);
crossbeam::scope(|scope| {
let mut handles = Vec::new();
let mut messaging_stop_txs = Vec::new();
let mut msg_handles = Vec::new();
for node in nodes {
// Start a local messaging service on the simulated node.
let messaging: Messaging<Vec<u8>> =
Messaging::new(NUM_NODES);
// Take the handle to receive the result after the thread finishes.
msg_handles.push(messaging.spawn(scope));
// Take the thread control handle.
messaging_stop_txs.push(messaging.stop_tx());
handles.push(scope.spawn(move || {
node.run(messaging)
}));
}
// Compare the set of values returned by broadcast against the expected
// set.
for h in handles {
match h.join() {
Err(Error::NotImplemented) => panic!(),
Err(err) => panic!("Error: {:?}", err),
Ok(v) => {
panic!("End of test");
let mut expected = HashSet::new();
for n in 0..NUM_NODES {
expected.insert(test_value_fmt(n));
}
debug!("Finished with values {:?}", v);
assert_eq!(v, expected);
},
}
}
// Stop all messaging tasks.
for tx in messaging_stop_txs {
tx.send(()).map_err(|e| {
error!("{}", e);
}).unwrap();
}
for (i, h) in msg_handles.into_iter().enumerate() {
match h.join() {
Ok(()) => debug!("Messaging[{}] stopped OK", i),
Err(e) => debug!("Messaging[{}] error: {:?}", i, e)
}
scope.spawn(move || {
debug!("Running {:?}", node);
node.run().unwrap();
});
}
});
}

View File

@ -38,18 +38,18 @@ impl<Message: Clone + Send + Sync> NetSim<Message> {
}
/// The TX side of a channel from node `src` to node `dst`.
pub fn tx(&self, src: usize, dst: usize) -> &Sender<Message> {
pub fn tx(&self, src: usize, dst: usize) -> Sender<Message> {
assert!(src < self.num_nodes);
assert!(dst < self.num_nodes);
&self.txs[src * self.num_nodes + dst]
self.txs[src * self.num_nodes + dst].clone()
}
/// The RX side of a channel from node `src` to node `dst`.
pub fn rx(&self, src: usize, dst: usize) -> &Receiver<Message> {
pub fn rx(&self, src: usize, dst: usize) -> Receiver<Message> {
assert!(src < self.num_nodes);
assert!(dst < self.num_nodes);
&self.rxs[src * self.num_nodes + dst]
self.rxs[src * self.num_nodes + dst].clone()
}
}