introduced a message handler trait and corrected the type of message handler container

This commit is contained in:
Vladimir Komendantskiy 2018-04-24 11:29:13 +01:00
parent 629c54494c
commit 36cb880186
3 changed files with 137 additions and 96 deletions

View File

@ -12,9 +12,24 @@ use reed_solomon_erasure as rse;
use reed_solomon_erasure::ReedSolomon;
use crossbeam_channel::{Sender, Receiver, SendError, RecvError};
use messaging::{Target, TargetedMessage, SourcedMessage};
use messaging::{Target, TargetedMessage, SourcedMessage,
ProposedValue};
type MerkleValue = Vec<u8>;
/*
pub struct Broadcast {
handler: Box<Fn(&QMessage, Sender<QMessage>) ->
Result<MessageLoopState, HandlerError>>
}
impl Broadcast {
pub fn new() -> Self {
}
pub fn handle(&self) -> Box<Fn(&QMessage, Sender<QMessage>) ->
Result<MessageLoopState, HandlerError>> {
}
}
*/
/// Broadcast algorithm instance.
///
@ -26,9 +41,9 @@ type MerkleValue = Vec<u8>;
/// might become part of the message for this to work.
pub struct Instance<'a, T: 'a + Clone + Debug + Send + Sync> {
/// The transmit side of the channel to comms threads.
tx: &'a Sender<TargetedMessage<MerkleValue>>,
tx: &'a Sender<TargetedMessage<ProposedValue>>,
/// The receive side of the channel from comms threads.
rx: &'a Receiver<SourcedMessage<MerkleValue>>,
rx: &'a Receiver<SourcedMessage<ProposedValue>>,
/// Value to be broadcast.
broadcast_value: Option<T>,
/// This instance's index for identification against its comms task.
@ -43,8 +58,8 @@ impl<'a, T: Clone + Debug + Hashable + Send + Sync
+ Into<Vec<u8>> + From<Vec<u8>>>
Instance<'a, T>
{
pub fn new(tx: &'a Sender<TargetedMessage<MerkleValue>>,
rx: &'a Receiver<SourcedMessage<MerkleValue>>,
pub fn new(tx: &'a Sender<TargetedMessage<ProposedValue>>,
rx: &'a Receiver<SourcedMessage<ProposedValue>>,
broadcast_value: Option<T>,
num_nodes: usize,
node_index: usize) ->
@ -94,7 +109,7 @@ pub enum Error {
Threading,
ProofConstructionFailed,
ReedSolomon(rse::Error),
Send(SendError<TargetedMessage<MerkleValue>>),
Send(SendError<TargetedMessage<ProposedValue>>),
Recv(RecvError)
}
@ -103,9 +118,9 @@ impl From<rse::Error> for Error
fn from(err: rse::Error) -> Error { Error::ReedSolomon(err) }
}
impl From<SendError<TargetedMessage<MerkleValue>>> for Error
impl From<SendError<TargetedMessage<ProposedValue>>> for Error
{
fn from(err: SendError<TargetedMessage<MerkleValue>>) -> Error { Error::Send(err) }
fn from(err: SendError<TargetedMessage<ProposedValue>>) -> Error { Error::Send(err) }
}
impl From<RecvError> for Error
@ -119,9 +134,9 @@ impl From<RecvError> for Error
/// need to be sent anywhere. It is returned to the broadcast instance and gets
/// recorded immediately.
fn send_shards<'a, T>(value: T,
tx: &'a Sender<TargetedMessage<MerkleValue>>,
tx: &'a Sender<TargetedMessage<ProposedValue>>,
coding: &ReedSolomon) ->
Result<Proof<MerkleValue>, Error>
Result<Proof<ProposedValue>, Error>
where T: Clone + Debug + Hashable + Send + Sync + Into<Vec<u8>> + From<Vec<u8>>
{
let data_shard_num = coding.data_shard_count();
@ -161,7 +176,7 @@ where T: Clone + Debug + Hashable + Send + Sync + Into<Vec<u8>> + From<Vec<u8>>
debug!("Shards: {:?}", shards);
let shards_t: Vec<MerkleValue> =
let shards_t: Vec<ProposedValue> =
shards.into_iter().map(|s| s.to_vec()).collect();
// Convert the Merkle tree into a partial binary tree for later
@ -195,8 +210,8 @@ where T: Clone + Debug + Hashable + Send + Sync + Into<Vec<u8>> + From<Vec<u8>>
}
/// The main loop of the broadcast task.
fn inner_run<'a, T>(tx: &'a Sender<TargetedMessage<MerkleValue>>,
rx: &'a Receiver<SourcedMessage<MerkleValue>>,
fn inner_run<'a, T>(tx: &'a Sender<TargetedMessage<ProposedValue>>,
rx: &'a Receiver<SourcedMessage<ProposedValue>>,
broadcast_value: Option<T>,
node_index: usize,
num_nodes: usize,
@ -398,7 +413,7 @@ where T: Clone + Debug + Hashable + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>
// Recompute the Merkle tree root.
//
// Collect shards for tree construction.
let mut shards: Vec<MerkleValue> = Vec::new();
let mut shards: Vec<ProposedValue> = Vec::new();
for l in leaf_values.iter() {
if let Some(ref v) = *l {
shards.push(v.to_vec());
@ -423,7 +438,7 @@ where T: Clone + Debug + Hashable + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>
/// Concatenates the first `n` leaf values of a Merkle tree `m` in one value of
/// type `T`. This is useful for reconstructing the data value held in the tree
/// and forgetting the leaves that contain parity information.
fn glue_shards<T>(m: MerkleTree<MerkleValue>, n: usize) -> T
fn glue_shards<T>(m: MerkleTree<ProposedValue>, n: usize) -> T
where T: From<Vec<u8>> + Into<Vec<u8>>
{
let mut t: Vec<u8> = Vec::new();
@ -491,6 +506,6 @@ fn index_of_path(mut path: Vec<bool>) -> usize {
}
/// Computes the Merkle tree leaf index of a value in a given proof.
fn index_of_proof(p: &Proof<MerkleValue>) -> usize {
fn index_of_proof(p: &Proof<ProposedValue>) -> usize {
index_of_path(path_of_lemma(&p.lemma))
}

View File

@ -1,15 +1,16 @@
//! The local message delivery system.
use std::collections::{HashSet, HashMap, VecDeque};
use std::fmt::Debug;
use std::mem;
use std::net::SocketAddr;
use std::rc::Rc;
use std::sync::RwLock;
use crossbeam::{Scope, ScopedJoinHandle};
use crossbeam_channel;
use crossbeam_channel::{bounded, unbounded, Sender, Receiver};
use proto::Message;
type NodeUid = SocketAddr;
/// Unique ID of a node.
pub type NodeUid = SocketAddr;
/// Type of algorithm primitive used in HoneyBadgerBFT.
///
@ -23,9 +24,9 @@ pub enum Algorithm {
/// Asynchronous Common Subset.
CommonSubset,
/// Reliable Broadcast instance.
Broadcast(usize),
Broadcast(NodeUid),
/// Binary Agreement instance.
Agreement(usize),
Agreement(NodeUid),
}
/// Type of proposed (encrypted) value for consensus.
@ -45,8 +46,6 @@ pub enum AlgoMessage {
/// A message sent between algorithm instances.
pub struct LocalMessage {
// /// Identifier of the algorithm that sent the message.
// src: Algorithm,
/// Identifier of the message destination algorithm.
dst: Algorithm,
/// Payload
@ -59,20 +58,21 @@ pub struct LocalMessage {
/// 1) `All`: all nodes if sent to socket tasks, or all local algorithm
/// instances if received from socket tasks.
///
/// 2) `Node(i)`: node `i` or local algorithm instances with the node index `i`.
/// 2) `Node(i)`: node `i` or local algorithm instances with the node ID `i`.
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum RemoteNode {
All,
Node(NodeUid)
}
/// Message with a designated target.
/// Message to or from a remote node.
#[derive(Clone, Debug, PartialEq)]
pub struct RemoteMessage {
pub node: RemoteNode,
pub message: Message<ProposedValue>
}
/// The union type of local and remote messages.
pub enum QMessage {
Local(LocalMessage),
Remote(RemoteMessage)
@ -80,8 +80,8 @@ pub enum QMessage {
/// States of the message loop consided as an automaton with output. There is
/// one exit state `Finished` and one transitional (also initial) state
/// `Processing` whose argument is a queue of messages that are output in order
/// to be sent to remote nodes.
/// `Processing` whose argument is an output queue of messages to be sent to
/// remote nodes.
#[derive(Clone, PartialEq)]
pub enum MessageLoopState {
Processing(VecDeque<RemoteMessage>),
@ -112,12 +112,22 @@ impl MessageLoopState {
}
}
/// Abstract type of message handler callback. A callback function has two
/// arguments: the sent message and the TX handle to send replies back to the
/// message loop. A call to the function returns either a new message loop state
/// - either `Finished` or a state with outgoing messages to remote nodes - or
/// an error.
pub trait Handler<HandlerError: AlgoError>: Send + Sync {
fn handle(&self, m: QMessage, tx: Sender<QMessage>) ->
Result<MessageLoopState, HandlerError>;
}
/// The queue functionality for messages sent between algorithm instances.
pub struct MessageQueue<HandlerError: AlgoError> {
pub struct MessageLoop<'a, HandlerError: 'a + AlgoError> {
/// Algorithm message handlers. Every message handler receives a message and
/// the TX handle of the incoming message queue.
algos: HashMap<Algorithm, Box<Fn(&QMessage, Sender<QMessage>) ->
Result<MessageLoopState, HandlerError>>>,
/// the TX handle of the incoming message queue for sending replies back to
/// the message loop.
algos: RwLock<HashMap<Algorithm, &'a Handler<HandlerError>>>,
/// The queue of local and remote messages.
queue: VecDeque<QMessage>,
/// TX handle of the message queue.
@ -130,14 +140,13 @@ pub struct MessageQueue<HandlerError: AlgoError> {
remote_txs: HashMap<NodeUid, Sender<Message<ProposedValue>>>
}
impl<HandlerError: AlgoError> MessageQueue<HandlerError> {
pub fn new(remote_txs: HashMap<NodeUid,
Sender<Message<ProposedValue>>>) ->
impl<'a, HandlerError: AlgoError> MessageLoop<'a, HandlerError> {
pub fn new(remote_txs: HashMap<NodeUid, Sender<Message<ProposedValue>>>) ->
Self
{
let (queue_tx, queue_rx) = unbounded();
MessageQueue {
algos: HashMap::new(),
MessageLoop {
algos: RwLock::new(HashMap::new()),
queue: VecDeque::new(),
queue_tx,
queue_rx,
@ -150,16 +159,15 @@ impl<HandlerError: AlgoError> MessageQueue<HandlerError> {
}
/// Registers a handler for messages sent to the given algorithm.
pub fn insert_algo(&mut self, algo: Algorithm,
handler: Box<Fn(&QMessage, Sender<QMessage>) ->
Result<MessageLoopState, HandlerError>>)
pub fn insert_algo(&'a self, algo: Algorithm,
handler: &'a Handler<HandlerError>)
{
let _ = self.algos.insert(algo, handler).unwrap();
let _ = self.algos.write().unwrap().insert(algo, handler).unwrap();
}
/// Unregisters the handler for messages sent to the given algorithm.
pub fn remove_algo(&mut self, algo: &Algorithm) {
let _ = self.algos.remove(algo).unwrap();
pub fn remove_algo(&self, algo: &Algorithm) {
let _ = self.algos.write().unwrap().remove(algo).unwrap();
}
/// Places a message at the end of the queue for routing to the destination
@ -177,32 +185,36 @@ impl<HandlerError: AlgoError> MessageQueue<HandlerError> {
// }
/// The message loop.
pub fn message_loop(&mut self) -> Result<MessageLoopState, Error>
pub fn run(&mut self) -> Result<MessageLoopState, Error>
{
let mut result = Ok(MessageLoopState::Processing(VecDeque::new()));
while let Ok(mut state) = result {
// Send any outgoing messages to remote nodes using the provided
// function.
if let MessageLoopState::Processing(messages) = state {
// TODO: error handling
self.send_remote(messages);
state = MessageLoopState::Processing(VecDeque::new())
(if let MessageLoopState::Processing(messages) = &state {
self.send_remote(messages)
.map(|_| MessageLoopState::Processing(VecDeque::new()))
.map_err(Error::from)
}
else {
Ok(MessageLoopState::Finished)
})?;
// Receive local and remote messages.
if let Ok(m) = self.queue_rx.recv() { match m {
if let Ok(m) = self.queue_rx.recv() { result = match m {
QMessage::Local(LocalMessage {
dst,
message
}) => {
result = if let Some(handler) = self.algos.get(&dst) {
let mut new_result = handler(&QMessage::Local(
LocalMessage {
dst,
message
}), self.queue_tx.clone()
).map_err(Error::from);
if let Some(handler) = self.algos.read().unwrap().get(&dst) {
let mut new_result =
handler.handle(QMessage::Local(
LocalMessage {
dst,
message
}), self.queue_tx.clone()
).map_err(Error::from);
if let Ok(ref mut new_state) = new_result {
state.append(new_state);
Ok(state)
@ -224,18 +236,19 @@ impl<HandlerError: AlgoError> MessageQueue<HandlerError> {
// Multicast the message to all algorithm instances,
// collecting output messages iteratively and appending them
// to result.
result = self.algos.iter()
self.algos.read().unwrap().iter()
.fold(Ok(state),
|result1, (_, handler)| {
if let Ok(mut state1) = result1 {
handler(&QMessage::Remote(RemoteMessage {
node: node.clone(),
message: message.clone()
}), self.queue_tx.clone()
).map(|ref mut state2| {
state1.append(state2);
state1
}).map_err(Error::from)
handler.handle(
QMessage::Remote(RemoteMessage {
node: node.clone(),
message: message.clone()
}), self.queue_tx.clone()
).map(|ref mut state2| {
state1.append(state2);
state1
}).map_err(Error::from)
}
else {
result1
@ -248,8 +261,7 @@ impl<HandlerError: AlgoError> MessageQueue<HandlerError> {
}
/// Send a message queue to remote nodes.
fn send_remote(&mut self,
messages: VecDeque<RemoteMessage>) ->
fn send_remote(&mut self, messages: &VecDeque<RemoteMessage>) ->
Result<(), Error>
{
messages.iter().fold(Ok(()), |result, m| {
@ -512,7 +524,7 @@ impl<T: Clone + Debug + Send + Sync> Messaging<T> {
}
/// Class of algorithm error types.
pub trait AlgoError {
pub trait AlgoError: Send + Sync {
fn to_str(&self) -> &'static str;
}

View File

@ -16,75 +16,89 @@ use std::fmt;
use std::fmt::Debug;
use std::io;
use std::net::SocketAddr;
use std::rc::Rc;
use std::sync::RwLock;
use crossbeam::{Scope, ScopedJoinHandle};
use crossbeam_channel::{bounded, Sender, Receiver};
use hbbft::proto::*;
use hbbft::messaging;
use hbbft::messaging::{AlgoError, Algorithm, ProposedValue, AlgoMessage,
MessageLoopState, MessageQueue, RemoteMessage};
use hbbft::messaging::{QMessage, NodeUid, AlgoError, Algorithm, ProposedValue,
AlgoMessage, Handler,
MessageLoopState, MessageLoop, RemoteMessage};
use hbbft::broadcast;
use netsim::NetSim;
/// This is a structure to start a consensus node.
#[derive(Debug)]
pub struct TestNode {
pub struct TestNode<'a> {
/// Node identifier.
node_index: usize,
/// Total number of nodes.
num_nodes: usize,
/// TX handles indexed with the receiving node address. One handle for each
/// other node.
txs: HashMap<SocketAddr, Sender<Message<Vec<u8>>>>,
txs: HashMap<NodeUid, Sender<Message<ProposedValue>>>,
/// RX handle indexed with the transmitting node address. One handle for
/// each other node.
rxs: HashMap<SocketAddr, Receiver<Message<Vec<u8>>>>,
rxs: HashMap<NodeUid, Receiver<Message<ProposedValue>>>,
/// Optionally, a value to be broadcast by this node.
value: Option<ProposedValue>
value: Option<ProposedValue>,
/// Messaging system.
message_loop: MessageLoop<'a, TestAlgoError>
}
impl TestNode
impl<'a> TestNode<'a>
{
/// Consensus node constructor. It only initialises initial parameters.
pub fn new(node_index: usize,
num_nodes: usize,
txs: HashMap<SocketAddr, Sender<Message<Vec<u8>>>>,
rxs: HashMap<SocketAddr, Receiver<Message<Vec<u8>>>>,
txs: HashMap<NodeUid, Sender<Message<ProposedValue>>>,
rxs: HashMap<NodeUid, Receiver<Message<ProposedValue>>>,
value: Option<ProposedValue>) -> Self
{
TestNode {
node_index,
num_nodes,
txs,
txs: txs.clone(),
rxs,
value
value,
message_loop: MessageLoop::new(txs)
}
}
pub fn run(&self) ->
Result<HashSet<ProposedValue>, Error>
pub fn run(&'a self) -> Result<HashSet<ProposedValue>, Error>
{
let mut stop = false;
// FIXME: localise to the Node context.
// let f: fn(&VecDeque<RemoteMessage>) = self.send_remote;
let mut mq: MessageQueue<TestAlgoError> = MessageQueue::new(
self.txs.clone()
);
let node0_uid = "127.0.0.1:0".parse().unwrap();
self.message_loop.insert_algo(Algorithm::Broadcast(node0_uid), self);
Err(Error::NotImplemented)
}
fn send_remote(&self, messages: &VecDeque<RemoteMessage>) {
// FIXME
pub fn handler(&self, m: QMessage, tx: Sender<QMessage>) ->
Result<MessageLoopState, TestAlgoError>
{
Err(TestAlgoError::TestError)
}
}
fn send_remote(messages: &VecDeque<RemoteMessage>) {
impl<'a> Handler<TestAlgoError> for TestNode<'a> {
fn handle(&self, m: QMessage, tx: Sender<QMessage>) ->
Result<MessageLoopState, TestAlgoError>
{
self.handler(m, tx)
}
}
// FIXME
pub fn broadcast_handler(txs: RwLock<HashMap<NodeUid,
Sender<Message<ProposedValue
>
>
>
>,
m: &QMessage, tx: Sender<QMessage>) ->
Result<MessageLoopState, TestAlgoError>
{
Err(TestAlgoError::TestError)
}
#[derive(Clone, Debug)]
@ -108,7 +122,7 @@ fn node_addr(node_index: usize) -> SocketAddr {
/// Creates a vector of test nodes but does not run them.
fn create_test_nodes(num_nodes: usize,
net: &NetSim<Message<Vec<u8>>>) ->
net: &NetSim<Message<Vec<u8>>>) ->
Vec<TestNode>
{
let mut nodes = Vec::new();
@ -151,9 +165,9 @@ fn test_4_broadcast_nodes() {
let nodes = create_test_nodes(NUM_NODES, &net);
crossbeam::scope(|scope| {
for node in nodes {
for node in nodes.iter() {
scope.spawn(move || {
debug!("Running {:?}", node);
debug!("Running {:?}", node.node_index);
node.run().unwrap();
});
}