From fb50e38eadd2c6e0824e0066a492200ce90e4435 Mon Sep 17 00:00:00 2001 From: Vladimir Komendantskiy Date: Thu, 10 May 2018 12:09:22 +0100 Subject: [PATCH] replaced the map of estimated values with only one optional value for the current epoch --- src/agreement.rs | 81 ++++++++++++++++++++++---------------------- src/common_subset.rs | 6 ++-- 2 files changed, 44 insertions(+), 43 deletions(-) diff --git a/src/agreement.rs b/src/agreement.rs index 0cf9d94..4ea3307 100644 --- a/src/agreement.rs +++ b/src/agreement.rs @@ -1,7 +1,7 @@ //! Binary Byzantine agreement protocol from a common coin protocol. use itertools::Itertools; -use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque}; +use std::collections::{BTreeSet, HashMap, VecDeque}; use std::hash::Hash; use proto::message; @@ -51,6 +51,7 @@ impl AgreementMessage { } } +/// Binary Agreement instance. pub struct Agreement { /// The UID of the corresponding proposer node. uid: NodeUid, @@ -65,10 +66,12 @@ pub struct Agreement { sent_bval: BTreeSet, /// Values received in AUX messages. Reset on every epoch update. received_aux: HashMap, - /// Estimates of the decision value in all epochs. The first estimated value - /// is provided as input by Common Subset using the `set_input` function - /// which triggers the algorithm to start. - estimated: BTreeMap, + /// The estimate of the decision value in the current epoch. + estimated: Option, + /// The value output by the agreement instance. It is set once to `Some(b)` + /// and then never changed. That is, no instance of Binary Agreement can + /// decide on two different values of output. + output: Option, /// Termination flag. The Agreement instance doesn't terminate immediately /// upon deciding on the agreed value. This is done in order to help other /// nodes decide despite asynchrony of communication. Once the instance @@ -90,7 +93,8 @@ impl Agreement { received_bval: HashMap::new(), sent_bval: BTreeSet::new(), received_aux: HashMap::new(), - estimated: BTreeMap::new(), + estimated: None, + output: None, terminated: false, } } @@ -100,13 +104,14 @@ impl Agreement { self.terminated } + /// Sets the input value for agreement. pub fn set_input(&mut self, input: bool) -> Result { if self.epoch != 0 { return Err(Error::InputNotAccepted); } // Set the initial estimated value to the input value. - self.estimated.insert(self.epoch, input); + self.estimated = Some(input); // Receive the BVAL message locally. self.received_bval .entry(self.uid.clone()) @@ -116,8 +121,9 @@ impl Agreement { Ok(AgreementMessage::BVal((self.epoch, input))) } - pub fn has_input(&self) -> bool { - self.estimated.get(&0).is_some() + /// Acceptance check to be performed before setting the input value. + pub fn accepts_input(&self) -> bool { + self.epoch == 0 && self.estimated.is_none() } /// Receive input from a remote node. @@ -126,7 +132,7 @@ impl Agreement { /// to remote nodes. There can be up to 2 messages. pub fn handle_agreement_message( &mut self, - sender_id: NodeUid, + sender_id: &NodeUid, message: &AgreementMessage, ) -> Result { match *message { @@ -146,11 +152,11 @@ impl Agreement { } } - fn handle_bval(&mut self, sender_id: NodeUid, b: bool) -> Result { + fn handle_bval(&mut self, sender_id: &NodeUid, b: bool) -> Result { let mut outgoing = VecDeque::new(); self.received_bval - .entry(sender_id) + .entry(sender_id.clone()) .or_insert_with(BTreeSet::new) .insert(b); let count_bval = self.received_bval @@ -172,8 +178,8 @@ impl Agreement { self.received_aux.insert(self.uid.clone(), b); } - let coin_result = self.try_coin(); - Ok((coin_result, outgoing)) + self.try_coin(); + Ok((self.output, outgoing)) } // upon receiving BVAL_r(b) messages from f + 1 nodes, if // BVAL_r(b) has not been sent, multicast BVAL_r(b) @@ -190,11 +196,11 @@ impl Agreement { } } - fn handle_aux(&mut self, sender_id: NodeUid, b: bool) -> Result { - self.received_aux.insert(sender_id, b); + fn handle_aux(&mut self, sender_id: &NodeUid, b: bool) -> Result { + self.received_aux.insert(sender_id.clone(), b); if !self.bin_values.is_empty() { - let coin_result = self.try_coin(); - Ok((coin_result, VecDeque::new())) + self.try_coin(); + Ok((self.output, VecDeque::new())) } else { Ok((None, VecDeque::new())) } @@ -225,22 +231,24 @@ impl Agreement { /// messages are received, thus this condition may be triggered upon arrival /// of either an AUX_r or a BVAL_r message). /// - /// `try_coin` outputs an optional decision value of the agreement instance. - fn try_coin(&mut self) -> Option { + /// Once the (N - f) messages are received, gets a common coin and uses it + /// to compute the next decision estimate and, optionally, sets the output + /// decision value. + fn try_coin(&mut self) { let (count_aux, vals) = self.count_aux(); if count_aux < self.num_nodes - self.num_faulty_nodes { // Continue waiting for the (N - f) AUX messages. - return None; + return; } // FIXME: Implement the Common Coin algorithm. At the moment the // coin value is common across different nodes but not random. - let coin2 = (self.epoch % 2) == 0; + let coin = (self.epoch % 2) == 0; // Check the termination condition: "continue looping until both a // value b is output in some round r, and the value Coin_r' = b for // some round r' > r." - self.terminated = self.terminated || self.estimated.values().any(|b| *b == coin2); + self.terminated = self.terminated || self.output == Some(coin); // Start the next epoch. self.bin_values.clear(); @@ -248,26 +256,19 @@ impl Agreement { self.epoch += 1; if vals.len() != 1 { - self.estimated.insert(self.epoch, coin2); - return None; + self.estimated = Some(coin); + return; } // NOTE: `vals` has exactly one element due to `vals.len() == 1` - let output: Vec> = vals.into_iter() - .take(1) - .map(|b| { - self.estimated.insert(self.epoch, b); - if b == coin2 { - // Output the agreement value. - Some(b) - } else { - // Don't output a value. - None - } - }) - .collect(); - - output[0] + let v: Vec = vals.into_iter().collect(); + let b = v[0]; + self.estimated = Some(b); + // Setting the output value is allowed only once. + if self.output.is_none() && b == coin { + // Output the agreement value. + self.output = Some(b); + } } } diff --git a/src/common_subset.rs b/src/common_subset.rs index 5bdd064..cd6fc94 100644 --- a/src/common_subset.rs +++ b/src/common_subset.rs @@ -95,7 +95,7 @@ impl CommonSubset { /// BA_j, then provide input 1 to BA_j. See Figure 11. fn on_broadcast_result(&mut self, uid: &NodeUid) -> Result, Error> { if let Some(agreement_instance) = self.agreement_instances.get_mut(&uid) { - if !agreement_instance.has_input() { + if agreement_instance.accepts_input() { Ok(Some(agreement_instance.set_input(true)?)) } else { Ok(None) @@ -168,7 +168,7 @@ impl CommonSubset { } else { // Send the message to the agreement instance. agreement_instance - .handle_agreement_message(sender_id.clone(), &amessage) + .handle_agreement_message(sender_id, &amessage) .map_err(Error::from) } } @@ -209,7 +209,7 @@ impl CommonSubset { if results1 >= self.num_nodes - self.num_faulty_nodes { for instance in self.agreement_instances.values_mut() { - if !instance.has_input() { + if instance.accepts_input() { outgoing.push_back(instance.set_input(false)?); } }