diff --git a/src/common_subset.rs b/src/common_subset.rs index feaea5f..96bcd8d 100644 --- a/src/common_subset.rs +++ b/src/common_subset.rs @@ -3,6 +3,8 @@ use crossbeam_channel::{SendError, Sender}; use std::collections::{HashMap, HashSet, VecDeque}; use std::sync::RwLock; +use std::hash::Hash; +use std::fmt::{Debug, Display}; use agreement; use agreement::Agreement; @@ -11,12 +13,12 @@ use broadcast; use broadcast::{Broadcast, TargetedBroadcastMessage}; use messaging; -use messaging::{AlgoMessage, Algorithm, Handler, LocalMessage, MessageLoopState, NodeUid, +use messaging::{AlgoMessage, Algorithm, Handler, LocalMessage, MessageLoopState, ProposedValue, QMessage, RemoteMessage}; use proto::{BroadcastMessage, AgreementMessage}; -pub enum Message { +pub enum Input { /// Local message to initiate broadcast of a value. CommonSubset(ProposedValue), /// Message from a remote node `uid` to the broadcast instance `uid`. @@ -25,22 +27,26 @@ pub enum Message { Agreement(NodeUid, AgreementMessage), } -struct CommonSubsetState { +pub enum Output { + Broadcast(TargetedBroadcastMessage) +} + +struct CommonSubsetState { agreement_inputs: HashMap, agreement_true_outputs: HashSet, agreements_without_input: HashSet, + broadcast_instances: HashMap>, + agreement_instances: HashMap, } -pub struct CommonSubset { +pub struct CommonSubset { uid: NodeUid, num_nodes: usize, num_faulty_nodes: usize, - broadcast_instances: HashMap>, - agreement_instances: HashMap, - state: RwLock, + state: RwLock>, } -impl CommonSubset { +impl CommonSubset { pub fn new(uid: NodeUid, num_nodes: usize, node_uids: HashSet) -> Self { let num_faulty_nodes = (num_nodes - 1) / 3; @@ -48,14 +54,14 @@ impl CommonSubset { uid, num_nodes, num_faulty_nodes, - // FIXME: instantiate broadcast instances - broadcast_instances: HashMap::new(), - // FIXME: instantiate agreement instances - agreement_instances: HashMap::new(), - state: RwLock::new(CommonSubsetState { - agreement_inputs: HashMap::new(), - agreement_true_outputs: HashSet::new(), - agreements_without_input: node_uids, + state: RwLock::new(CommonSubsetState { + agreement_inputs: HashMap::new(), + agreement_true_outputs: HashSet::new(), + agreements_without_input: node_uids, + // FIXME: instantiate broadcast instances + broadcast_instances: HashMap::new(), + // FIXME: instantiate agreement instances + agreement_instances: HashMap::new(), }), } } @@ -63,60 +69,69 @@ impl CommonSubset { /// Common Subset input message handler. It receives a value for broadcast /// and redirects it to the corresponding broadcast instance. pub fn on_proposed_value(&self, value: ProposedValue) -> - Result, Error> + Result>, Error> { // Upon receiving input v_i , input v_i to RBC_i. See Figure 2. - if let Some(instance) = self.broadcast_instances.get(&self.uid) { + let state = self.state.read().unwrap(); + if let Some(instance) = state.broadcast_instances.get(&self.uid) { Ok(instance .propose_value(value)? .into_iter() - .map(TargetedBroadcastMessage::into_remote_message) + .map(Output::Broadcast) .collect()) } else { - Err(Error::NoSuchBroadcastInstance(self.uid)) + Err(Error::NoSuchBroadcastInstance) } } /// Upon delivery of v_j from RBC_j, if input has not yet been provided to /// BA_j, then provide input 1 to BA_j. See Figure 11. - pub fn on_broadcast_output(&mut self, uid: NodeUid) -> + pub fn on_broadcast_output(&self, uid: NodeUid) -> Result<(), Error> { - if let Some(agreement_instance) = self.agreement_instances.get_mut(&uid) { + let mut state = self.state.write().unwrap(); + if let Some(agreement_instance) = state.agreement_instances.get_mut(&uid) { if agreement_instance.get_input().is_none() { agreement_instance.set_input(true); - let mut state = self.state.write().unwrap(); - state.agreements_without_input.remove(&uid); + // FIXME: implement this counter indirectly by filtering the + // list of Agreement instances + // + // state.agreements_without_input.remove(&uid); } Ok(()) } else { - Err(Error::NoSuchBroadcastInstance(self.uid)) + Err(Error::NoSuchBroadcastInstance) } } - pub fn handle_input(&self, message: Message) -> - Result, Error> + pub fn on_input(&self, message: Input) -> + Result>, Error> { match message { - Message::CommonSubset(value) => self.on_proposed_value(value), - Message::Broadcast(uid, bmessage) => { - if let Some(broadcast_instance) = self.broadcast_instances.get_mut(&uid) { - broadcast_instance.handle_broadcast_message(uid, &bmessage) + Input::CommonSubset(value) => self.on_proposed_value(value), + Input::Broadcast(uid, bmessage) => { + let mut state = self.state.read().unwrap(); + if let Some(broadcast_instance) = state.broadcast_instances.get(&uid) { + broadcast_instance.handle_broadcast_message(&uid, &bmessage) .map(|(value, queue)| { - if let Some(value) = value { + if let Some(_value) = value { + // FIXME: use `value` self.on_broadcast_output(uid); } queue + .into_iter() + .map(Output::Broadcast) + .collect() }) .map_err(Error::from) } else { - Err(Error::NoSuchBroadcastInstance(uid)) + Err(Error::NoSuchBroadcastInstance) } }, - Message::Agreement(_uid, _message) => { + Input::Agreement(_uid, _message) => { Err(Error::NotImplemented) } } @@ -180,7 +195,7 @@ where pub enum Error { UnexpectedMessage, NotImplemented, - NoSuchBroadcastInstance(NodeUid), + NoSuchBroadcastInstance, Send(SendError), Broadcast(broadcast::Error), }