From 2daaf7baf2d126519ffa888562d1d7d262cb46c6 Mon Sep 17 00:00:00 2001 From: Vladimir Komendantskiy Date: Tue, 1 May 2018 15:08:40 +0100 Subject: [PATCH] rewrote part of Common Subset to call Broadcast via a callback handler --- src/common_subset.rs | 51 ++++++++++++++++++++++++++++++-------------- 1 file changed, 35 insertions(+), 16 deletions(-) diff --git a/src/common_subset.rs b/src/common_subset.rs index 644a246..67db5b2 100644 --- a/src/common_subset.rs +++ b/src/common_subset.rs @@ -4,8 +4,16 @@ use crossbeam_channel::{SendError, Sender}; use std::collections::{HashMap, HashSet, VecDeque}; use std::sync::RwLock; +use broadcast; +use broadcast::Broadcast; + use messaging; -use messaging::{AlgoMessage, Algorithm, Handler, LocalMessage, MessageLoopState, NodeUid, QMessage}; +use messaging::{AlgoMessage, Algorithm, Handler, LocalMessage, RemoteMessage, + MessageLoopState, NodeUid, QMessage, ProposedValue}; + +pub enum CommonSubsetMessage { + +} struct CommonSubsetState { agreement_inputs: HashMap, @@ -17,6 +25,7 @@ pub struct CommonSubset { uid: NodeUid, num_nodes: usize, num_faulty_nodes: usize, + broadcast_instances: HashMap, state: RwLock, } @@ -28,6 +37,8 @@ impl CommonSubset { uid, num_nodes, num_faulty_nodes, + // FIXME: instantiate broadcast instances + broadcast_instances: HashMap::new(), state: RwLock::new(CommonSubsetState { agreement_inputs: HashMap::new(), agreement_true_outputs: HashSet::new(), @@ -36,22 +47,21 @@ impl CommonSubset { } } - pub fn on_message(&self, m: QMessage, tx: &Sender) -> Result - where - E: From + From, + /// Common Subset input message handler. It receives a value for broadcast + /// and redirects it to the corresponding broadcast instance. + pub fn on_message_input(&self, value: ProposedValue) -> + Result, Error> { - match m { - QMessage::Local(LocalMessage { message, .. }) => { - let no_outgoing = Ok(MessageLoopState::Processing(VecDeque::new())); - - match message { - // Upon receiving input v_i , input v_i to RBC_i. See Figure 2. - AlgoMessage::CommonSubsetInput(value) => { - tx.send(QMessage::Local(LocalMessage { - dst: Algorithm::Broadcast(self.uid), - message: AlgoMessage::BroadcastInput(value), - })).map_err(Error::from)?; - + // Upon receiving input v_i , input v_i to RBC_i. See Figure 2. + if let Some(instance) = self.broadcast_instances.get(&self.uid) { + instance.handle_broadcast_value(value) + } + else { + Err(Error::NoSuchBroadcastInstance(self.uid)) + } + } +} + /* no_outgoing } @@ -125,12 +135,15 @@ where self.on_message(m, &tx) } } +*/ #[derive(Clone, Debug)] pub enum Error { UnexpectedMessage, NotImplemented, + NoSuchBroadcastInstance(NodeUid), Send(SendError), + Broadcast(broadcast::Error), } impl From> for Error { @@ -138,3 +151,9 @@ impl From> for Error { Error::Send(err) } } + +impl From for Error { + fn from(err: broadcast::Error) -> Error { + Error::Broadcast(err) + } +}