rewrote part of Common Subset to call Broadcast via a callback handler

This commit is contained in:
Vladimir Komendantskiy 2018-05-01 15:08:40 +01:00
parent d7882bae9c
commit 2daaf7baf2
1 changed files with 35 additions and 16 deletions

View File

@ -4,8 +4,16 @@ use crossbeam_channel::{SendError, Sender};
use std::collections::{HashMap, HashSet, VecDeque}; use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::RwLock; use std::sync::RwLock;
use broadcast;
use broadcast::Broadcast;
use messaging; use messaging;
use messaging::{AlgoMessage, Algorithm, Handler, LocalMessage, MessageLoopState, NodeUid, QMessage}; use messaging::{AlgoMessage, Algorithm, Handler, LocalMessage, RemoteMessage,
MessageLoopState, NodeUid, QMessage, ProposedValue};
pub enum CommonSubsetMessage {
}
struct CommonSubsetState { struct CommonSubsetState {
agreement_inputs: HashMap<NodeUid, bool>, agreement_inputs: HashMap<NodeUid, bool>,
@ -17,6 +25,7 @@ pub struct CommonSubset {
uid: NodeUid, uid: NodeUid,
num_nodes: usize, num_nodes: usize,
num_faulty_nodes: usize, num_faulty_nodes: usize,
broadcast_instances: HashMap<NodeUid, Broadcast>,
state: RwLock<CommonSubsetState>, state: RwLock<CommonSubsetState>,
} }
@ -28,6 +37,8 @@ impl CommonSubset {
uid, uid,
num_nodes, num_nodes,
num_faulty_nodes, num_faulty_nodes,
// FIXME: instantiate broadcast instances
broadcast_instances: HashMap::new(),
state: RwLock::new(CommonSubsetState { state: RwLock::new(CommonSubsetState {
agreement_inputs: HashMap::new(), agreement_inputs: HashMap::new(),
agreement_true_outputs: HashSet::new(), agreement_true_outputs: HashSet::new(),
@ -36,22 +47,21 @@ impl CommonSubset {
} }
} }
pub fn on_message<E>(&self, m: QMessage, tx: &Sender<QMessage>) -> Result<MessageLoopState, E> /// Common Subset input message handler. It receives a value for broadcast
where /// and redirects it to the corresponding broadcast instance.
E: From<Error> + From<messaging::Error>, pub fn on_message_input(&self, value: ProposedValue) ->
Result<VecDeque<RemoteMessage>, Error>
{ {
match m { // Upon receiving input v_i , input v_i to RBC_i. See Figure 2.
QMessage::Local(LocalMessage { message, .. }) => { if let Some(instance) = self.broadcast_instances.get(&self.uid) {
let no_outgoing = Ok(MessageLoopState::Processing(VecDeque::new())); instance.handle_broadcast_value(value)
}
match message { else {
// Upon receiving input v_i , input v_i to RBC_i. See Figure 2. Err(Error::NoSuchBroadcastInstance(self.uid))
AlgoMessage::CommonSubsetInput(value) => { }
tx.send(QMessage::Local(LocalMessage { }
dst: Algorithm::Broadcast(self.uid), }
message: AlgoMessage::BroadcastInput(value), /*
})).map_err(Error::from)?;
no_outgoing no_outgoing
} }
@ -125,12 +135,15 @@ where
self.on_message(m, &tx) self.on_message(m, &tx)
} }
} }
*/
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub enum Error { pub enum Error {
UnexpectedMessage, UnexpectedMessage,
NotImplemented, NotImplemented,
NoSuchBroadcastInstance(NodeUid),
Send(SendError<QMessage>), Send(SendError<QMessage>),
Broadcast(broadcast::Error),
} }
impl From<SendError<QMessage>> for Error { impl From<SendError<QMessage>> for Error {
@ -138,3 +151,9 @@ impl From<SendError<QMessage>> for Error {
Error::Send(err) Error::Send(err)
} }
} }
impl From<broadcast::Error> for Error {
fn from(err: broadcast::Error) -> Error {
Error::Broadcast(err)
}
}