test prototype of a message loop implementation

This commit is contained in:
Vladimir Komendantskiy 2018-04-18 21:06:54 +01:00
parent 7b82228b57
commit f801f0c1b0
2 changed files with 116 additions and 8 deletions

View File

@ -1,5 +1,5 @@
//! The local message delivery system.
use std::collections::{HashSet, HashMap};
use std::collections::{HashSet, HashMap, VecDeque};
use std::fmt::Debug;
use crossbeam::{Scope, ScopedJoinHandle};
use crossbeam_channel;
@ -9,6 +9,7 @@ use proto::Message;
/// Type of algorithm primitive used in HoneyBadgerBFT.
///
/// TODO: Add the epoch parameter?
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum Algorithm {
/// Encryption stage.
Encryption,
@ -23,9 +24,9 @@ pub enum Algorithm {
}
/// Type of proposed (encrypted) value for consensus.
type ProposedValue = Vec<u8>;
pub type ProposedValue = Vec<u8>;
/// Messages sent between algorithm instances.
/// Kinds of messages sent between algorithm instances.
pub enum AlgoMessage {
/// Asynchronous common subset input.
CommonSubsetInput(ProposedValue),
@ -37,6 +38,7 @@ pub enum AlgoMessage {
Agreement(bool)
}
/// A message sent between algorithm instances.
pub struct RoutedMessage {
/// Identifier of the algorithm that sent the message.
src: Algorithm,
@ -45,12 +47,86 @@ pub struct RoutedMessage {
message: AlgoMessage
}
pub struct MessageRouting {
message_handlers: HashMap<Algorithm, Box<FnMut()>>
#[derive(PartialEq, Eq)]
pub enum MessageLoopState {
Processing,
Finished
}
impl MessageRouting {
pub fn add_algo(&mut self, algo: Algorithm, handler: Box<FnMut()>) {
impl MessageLoopState {
pub fn is_processing(&self) -> bool {
if let MessageLoopState::Processing = self {
true
}
else {
false
}
}
}
/// The queue functionality for messages sent between algorithm instances.
pub struct MessageQueue<HandlerError: AlgoError> {
algos: HashMap<Algorithm, Box<Fn(&AlgoMessage) ->
Result<MessageLoopState, HandlerError>>>,
queue: VecDeque<RoutedMessage>
}
impl<HandlerError: AlgoError> MessageQueue<HandlerError> {
pub fn new() -> Self {
MessageQueue {
algos: HashMap::new(),
queue: VecDeque::new()
}
}
/// Registers a handler for messages sent to the given algorithm.
pub fn insert_algo(&mut self, algo: Algorithm,
handler: Box<Fn(&AlgoMessage) ->
Result<MessageLoopState, HandlerError>>)
{
let _ = self.algos.insert(algo, handler).unwrap();
}
/// Unregisters the handler for messages sent to the given algorithm.
pub fn remove_algo(&mut self, algo: &Algorithm) {
let _ = self.algos.remove(algo).unwrap();
}
/// Places a message at the end of the queue for routing to the destination
/// later.
pub fn push(&mut self, message: RoutedMessage) {
self.queue.push_back(message);
}
/// Removes and returns the message from the front of the queue if the queue
/// is not empty.
pub fn pop(&mut self) -> Option<RoutedMessage> {
self.queue.pop_front()
}
/// Message delivery routine.
pub fn deliver(&mut self) -> Result<MessageLoopState, Error> {
let mut result = Ok(MessageLoopState::Processing);
let mut queue_empty = false;
while !queue_empty && result.is_ok() {
if let Some(RoutedMessage {
src: ref _src,
ref dst,
ref message
}) = self.pop() {
if let Some(handler) = self.algos.get(dst) {
result = handler(message).map_err(Error::from);
}
else {
result = Err(Error::NoSuchDestination);
}
}
else {
queue_empty = true;
}
}
result
}
}
@ -279,12 +355,23 @@ impl<T: Clone + Debug + Send + Sync> Messaging<T> {
}
}
/// Class of algorithm error types.
pub trait AlgoError {
fn to_str(&self) -> &'static str;
}
#[derive(Clone, Debug)]
pub enum Error {
NoSuchDestination,
AlgoError(&'static str),
NoSuchTarget,
SendError,
}
impl<E: AlgoError> From<E> for Error {
fn from(e: E) -> Error { Error::AlgoError(e.to_str()) }
}
impl<T> From<crossbeam_channel::SendError<T>> for Error {
fn from(_: crossbeam_channel::SendError<T>) -> Error { Error::SendError }
}

View File

@ -21,7 +21,9 @@ use crossbeam_channel::{bounded, Sender, Receiver};
use hbbft::proto::*;
use hbbft::messaging;
use hbbft::messaging::{Messaging, SourcedMessage};
use hbbft::messaging::{AlgoError, Algorithm, ProposedValue, AlgoMessage,
MessageLoopState, MessageQueue,
Messaging, SourcedMessage};
use hbbft::broadcast;
use netsim::NetSim;
@ -261,11 +263,30 @@ fn create_test_nodes<'a>(num_nodes: usize,
nodes
}
#[derive(Debug)]
enum TestAlgoError {
TestError
}
impl AlgoError for TestAlgoError {
fn to_str(&self) -> &'static str {
"TestError"
}
}
#[test]
fn test_4_broadcast_nodes() {
simple_logger::init_with_level(log::Level::Debug).unwrap();
const NUM_NODES: usize = 4;
let mut stop = false;
let mut mq: MessageQueue<TestAlgoError> = MessageQueue::new();
let mut loop_result = Ok(MessageLoopState::Processing);
while loop_result.is_ok() && loop_result.unwrap().is_processing() {
loop_result = mq.deliver();
}
let net: NetSim<Message<Vec<u8>>> = NetSim::new(NUM_NODES);
let nodes = create_test_nodes(NUM_NODES, &net);