From 389855839fb91321cfb282e408cf021545a81086 Mon Sep 17 00:00:00 2001 From: Vladimir Komendantskiy Date: Wed, 2 May 2018 11:57:28 +0100 Subject: [PATCH] wrote CommonSubset::on_agreement_result and made related updates in agreement.rs --- src/agreement.rs | 23 +++++- src/common_subset.rs | 173 +++++++++++++++++-------------------------- 2 files changed, 89 insertions(+), 107 deletions(-) diff --git a/src/agreement.rs b/src/agreement.rs index 9499650..c8d8047 100644 --- a/src/agreement.rs +++ b/src/agreement.rs @@ -1,15 +1,30 @@ //! Binary Byzantine agreement protocol from a common coin protocol. +use std::collections::VecDeque; +use proto::AgreementMessage; + pub struct Agreement { input: Option } impl Agreement { - pub fn get_input(&self) -> Option { - self.input - } - pub fn set_input(&mut self, input: bool) { self.input = Some(input); } + + pub fn has_input(&self) -> bool { + self.input.is_some() + } + + /// Receive input from a remote node. + pub fn on_input(&self, _message: AgreementMessage) -> + Result, Error> + { + Err(Error::NotImplemented) + } +} + +#[derive(Clone, Debug)] +pub enum Error { + NotImplemented } diff --git a/src/common_subset.rs b/src/common_subset.rs index 96bcd8d..a88a101 100644 --- a/src/common_subset.rs +++ b/src/common_subset.rs @@ -1,8 +1,6 @@ //! Asynchronous Common Subset algorithm. -use crossbeam_channel::{SendError, Sender}; use std::collections::{HashMap, HashSet, VecDeque}; -use std::sync::RwLock; use std::hash::Hash; use std::fmt::{Debug, Display}; @@ -12,29 +10,25 @@ use agreement::Agreement; use broadcast; use broadcast::{Broadcast, TargetedBroadcastMessage}; -use messaging; -use messaging::{AlgoMessage, Algorithm, Handler, LocalMessage, MessageLoopState, - ProposedValue, QMessage, RemoteMessage}; +use messaging::ProposedValue; use proto::{BroadcastMessage, AgreementMessage}; +/// Input from a remote node to Common Subset. pub enum Input { - /// Local message to initiate broadcast of a value. - CommonSubset(ProposedValue), /// Message from a remote node `uid` to the broadcast instance `uid`. Broadcast(NodeUid, BroadcastMessage), /// Message from a remote node `uid` to the agreement instance `uid`. Agreement(NodeUid, AgreementMessage), } +/// Output from Common Subset to remote nodes. pub enum Output { Broadcast(TargetedBroadcastMessage) } struct CommonSubsetState { - agreement_inputs: HashMap, agreement_true_outputs: HashSet, - agreements_without_input: HashSet, broadcast_instances: HashMap>, agreement_instances: HashMap, } @@ -43,37 +37,34 @@ pub struct CommonSubset { uid: NodeUid, num_nodes: usize, num_faulty_nodes: usize, - state: RwLock>, + agreement_true_outputs: HashSet, + broadcast_instances: HashMap>, + agreement_instances: HashMap, } impl CommonSubset { - pub fn new(uid: NodeUid, num_nodes: usize, node_uids: HashSet) -> Self { + pub fn new(uid: NodeUid, num_nodes: usize) -> Self { let num_faulty_nodes = (num_nodes - 1) / 3; CommonSubset { uid, num_nodes, num_faulty_nodes, - state: RwLock::new(CommonSubsetState { - agreement_inputs: HashMap::new(), - agreement_true_outputs: HashSet::new(), - agreements_without_input: node_uids, - // FIXME: instantiate broadcast instances - broadcast_instances: HashMap::new(), - // FIXME: instantiate agreement instances - agreement_instances: HashMap::new(), - }), + agreement_true_outputs: HashSet::new(), + // FIXME: instantiate broadcast instances + broadcast_instances: HashMap::new(), + // FIXME: instantiate agreement instances + agreement_instances: HashMap::new(), } } /// Common Subset input message handler. It receives a value for broadcast /// and redirects it to the corresponding broadcast instance. - pub fn on_proposed_value(&self, value: ProposedValue) -> + pub fn send_proposed_value(&self, value: ProposedValue) -> Result>, Error> { // Upon receiving input v_i , input v_i to RBC_i. See Figure 2. - let state = self.state.read().unwrap(); - if let Some(instance) = state.broadcast_instances.get(&self.uid) { + if let Some(instance) = self.broadcast_instances.get(&self.uid) { Ok(instance .propose_value(value)? .into_iter() @@ -86,18 +77,12 @@ impl CommonSubset { /// Upon delivery of v_j from RBC_j, if input has not yet been provided to /// BA_j, then provide input 1 to BA_j. See Figure 11. - pub fn on_broadcast_output(&self, uid: NodeUid) -> + pub fn on_broadcast_result(&mut self, uid: NodeUid) -> Result<(), Error> { - let mut state = self.state.write().unwrap(); - if let Some(agreement_instance) = state.agreement_instances.get_mut(&uid) { - if agreement_instance.get_input().is_none() { + if let Some(agreement_instance) = self.agreement_instances.get_mut(&uid) { + if !agreement_instance.has_input() { agreement_instance.set_input(true); - - // FIXME: implement this counter indirectly by filtering the - // list of Agreement instances - // - // state.agreements_without_input.remove(&uid); } Ok(()) } @@ -106,104 +91,80 @@ impl CommonSubset { } } - pub fn on_input(&self, message: Input) -> + /// Receive input from a remote node. + pub fn on_input(&mut self, message: Input) -> Result>, Error> { match message { - Input::CommonSubset(value) => self.on_proposed_value(value), Input::Broadcast(uid, bmessage) => { - let mut state = self.state.read().unwrap(); - if let Some(broadcast_instance) = state.broadcast_instances.get(&uid) { - broadcast_instance.handle_broadcast_message(&uid, &bmessage) - .map(|(value, queue)| { - if let Some(_value) = value { - // FIXME: use `value` - self.on_broadcast_output(uid); - } - queue - .into_iter() - .map(Output::Broadcast) - .collect() - }) - .map_err(Error::from) - } - else { - Err(Error::NoSuchBroadcastInstance) + let mut instance_result = None; + let input_result = { + if let Some(broadcast_instance) = self.broadcast_instances.get(&uid) { + broadcast_instance.handle_broadcast_message(&uid, &bmessage) + .map(|(value, queue)| { + if let Some(value) = value { + instance_result = Some(value) + } + queue + .into_iter() + .map(Output::Broadcast) + .collect() + }) + .map_err(Error::from) + } + else { + Err(Error::NoSuchBroadcastInstance) + } + }; + if instance_result.is_some() { + self.on_broadcast_result(uid); } + input_result }, Input::Agreement(_uid, _message) => { + // FIXME: send the message to the Agreement instance and + // conditionally call `on_agreement_output` + Err(Error::NotImplemented) } } } -} - /* - // Upon delivery of value 1 from at least N − f instances of BA, - // provide input 0 to each instance of BA that has not yet been - // provided input. - AlgoMessage::AgreementOutput(uid, true) => { - let mut state = self.state.write().unwrap(); - state.agreement_true_outputs.insert(uid); + /// Callback to be invoked on receipt of a returned value of the Agreement + /// instance `uid`. + fn on_agreement_result(&mut self, uid: NodeUid, result: bool) { + // Upon delivery of value 1 from at least N − f instances of BA, provide + // input 0 to each instance of BA that has not yet been provided input. + if result { + self.agreement_true_outputs.insert(uid); - if state.agreement_true_outputs.len() - >= self.num_nodes - self.num_faulty_nodes - { - // FIXME: Avoid cloning the set. - for uid0 in state.agreements_without_input.clone() { - tx.send(QMessage::Local(LocalMessage { - dst: Algorithm::Agreement(uid0), - message: AlgoMessage::AgreementInput(false), - })).map_err(Error::from)?; - - // TODO: Possibly not required. Keeping in place to - // avoid resending `false`. - let _ = state.agreement_inputs.insert(uid0, false); - } - } - - no_outgoing + if self.agreement_true_outputs.len() >= + self.num_nodes - self.num_faulty_nodes + { + let instances = &mut self.agreement_instances; + for (_uid0, instance) in instances.iter_mut() { + if !instance.has_input() { + instance.set_input(false); } - - // FIXME (missing clause): - // - // Once all instances of BA have completed, let C ⊂ [1..N] be - // the indexes of each BA that delivered 1. Wait for the output - // v_j for each RBC_j such that j∈C. Finally output ∪ j∈C v_j. - - // Catchall - _ => Err(Error::UnexpectedMessage).map_err(E::from), } } - - _ => Err(Error::UnexpectedMessage).map_err(E::from), } } -} -impl Handler for CommonSubset -where - E: From + From, -{ - fn handle(&self, m: QMessage, tx: Sender) -> Result { - self.on_message(m, &tx) - } + // FIXME (missing clause): + // + // Once all instances of BA have completed, let C ⊂ [1..N] be + // the indexes of each BA that delivered 1. Wait for the output + // v_j for each RBC_j such that j∈C. Finally output ∪ j∈C v_j. } -*/ #[derive(Clone, Debug)] pub enum Error { UnexpectedMessage, NotImplemented, NoSuchBroadcastInstance, - Send(SendError), Broadcast(broadcast::Error), -} - -impl From> for Error { - fn from(err: SendError) -> Error { - Error::Send(err) - } + Agreement(agreement::Error), } impl From for Error { @@ -211,3 +172,9 @@ impl From for Error { Error::Broadcast(err) } } + +impl From for Error { + fn from(err: agreement::Error) -> Error { + Error::Agreement(err) + } +}