diff --git a/src/messaging.rs b/src/messaging.rs index 2b9b1bf..9a41d57 100644 --- a/src/messaging.rs +++ b/src/messaging.rs @@ -1,5 +1,5 @@ //! The local message delivery system. -use std::collections::{HashSet, HashMap}; +use std::collections::{HashSet, HashMap, VecDeque}; use std::fmt::Debug; use crossbeam::{Scope, ScopedJoinHandle}; use crossbeam_channel; @@ -9,6 +9,7 @@ use proto::Message; /// Type of algorithm primitive used in HoneyBadgerBFT. /// /// TODO: Add the epoch parameter? +#[derive(Clone, Debug, PartialEq, Eq, Hash)] pub enum Algorithm { /// Encryption stage. Encryption, @@ -23,9 +24,9 @@ pub enum Algorithm { } /// Type of proposed (encrypted) value for consensus. -type ProposedValue = Vec; +pub type ProposedValue = Vec; -/// Messages sent between algorithm instances. +/// Kinds of messages sent between algorithm instances. pub enum AlgoMessage { /// Asynchronous common subset input. CommonSubsetInput(ProposedValue), @@ -37,6 +38,7 @@ pub enum AlgoMessage { Agreement(bool) } +/// A message sent between algorithm instances. pub struct RoutedMessage { /// Identifier of the algorithm that sent the message. src: Algorithm, @@ -45,12 +47,86 @@ pub struct RoutedMessage { message: AlgoMessage } -pub struct MessageRouting { - message_handlers: HashMap> +#[derive(PartialEq, Eq)] +pub enum MessageLoopState { + Processing, + Finished } -impl MessageRouting { - pub fn add_algo(&mut self, algo: Algorithm, handler: Box) { +impl MessageLoopState { + pub fn is_processing(&self) -> bool { + if let MessageLoopState::Processing = self { + true + } + else { + false + } + } +} + +/// The queue functionality for messages sent between algorithm instances. +pub struct MessageQueue { + algos: HashMap + Result>>, + queue: VecDeque +} + +impl MessageQueue { + pub fn new() -> Self { + MessageQueue { + algos: HashMap::new(), + queue: VecDeque::new() + } + } + + /// Registers a handler for messages sent to the given algorithm. + pub fn insert_algo(&mut self, algo: Algorithm, + handler: Box + Result>) + { + let _ = self.algos.insert(algo, handler).unwrap(); + } + + /// Unregisters the handler for messages sent to the given algorithm. + pub fn remove_algo(&mut self, algo: &Algorithm) { + let _ = self.algos.remove(algo).unwrap(); + } + + /// Places a message at the end of the queue for routing to the destination + /// later. + pub fn push(&mut self, message: RoutedMessage) { + self.queue.push_back(message); + } + + /// Removes and returns the message from the front of the queue if the queue + /// is not empty. + pub fn pop(&mut self) -> Option { + self.queue.pop_front() + } + + /// Message delivery routine. + pub fn deliver(&mut self) -> Result { + let mut result = Ok(MessageLoopState::Processing); + let mut queue_empty = false; + + while !queue_empty && result.is_ok() { + if let Some(RoutedMessage { + src: ref _src, + ref dst, + ref message + }) = self.pop() { + if let Some(handler) = self.algos.get(dst) { + result = handler(message).map_err(Error::from); + } + else { + result = Err(Error::NoSuchDestination); + } + } + else { + queue_empty = true; + } + } + result } } @@ -279,12 +355,23 @@ impl Messaging { } } +/// Class of algorithm error types. +pub trait AlgoError { + fn to_str(&self) -> &'static str; +} + #[derive(Clone, Debug)] pub enum Error { + NoSuchDestination, + AlgoError(&'static str), NoSuchTarget, SendError, } +impl From for Error { + fn from(e: E) -> Error { Error::AlgoError(e.to_str()) } +} + impl From> for Error { fn from(_: crossbeam_channel::SendError) -> Error { Error::SendError } } diff --git a/tests/broadcast.rs b/tests/broadcast.rs index fb3c5e1..e04338a 100644 --- a/tests/broadcast.rs +++ b/tests/broadcast.rs @@ -21,7 +21,9 @@ use crossbeam_channel::{bounded, Sender, Receiver}; use hbbft::proto::*; use hbbft::messaging; -use hbbft::messaging::{Messaging, SourcedMessage}; +use hbbft::messaging::{AlgoError, Algorithm, ProposedValue, AlgoMessage, + MessageLoopState, MessageQueue, + Messaging, SourcedMessage}; use hbbft::broadcast; use netsim::NetSim; @@ -261,11 +263,30 @@ fn create_test_nodes<'a>(num_nodes: usize, nodes } +#[derive(Debug)] +enum TestAlgoError { + TestError +} + +impl AlgoError for TestAlgoError { + fn to_str(&self) -> &'static str { + "TestError" + } +} + #[test] fn test_4_broadcast_nodes() { simple_logger::init_with_level(log::Level::Debug).unwrap(); const NUM_NODES: usize = 4; + let mut stop = false; + let mut mq: MessageQueue = MessageQueue::new(); + let mut loop_result = Ok(MessageLoopState::Processing); + + while loop_result.is_ok() && loop_result.unwrap().is_processing() { + loop_result = mq.deliver(); + } + let net: NetSim>> = NetSim::new(NUM_NODES); let nodes = create_test_nodes(NUM_NODES, &net);