diff --git a/src/agreement/bin_values.rs b/src/agreement/bin_values.rs index 12081bc..c3fab43 100644 --- a/src/agreement/bin_values.rs +++ b/src/agreement/bin_values.rs @@ -1,5 +1,6 @@ use std::iter::FromIterator; use std::mem::replace; +use std::slice; /// A lattice-valued description of the state of `bin_values`, essentially the same as the set of /// subsets of `bool`. @@ -109,3 +110,24 @@ impl FromIterator for BinValues { v } } + +// Statically allocated slices for constructing `BinValues` iterators: + +const NONE: &[bool] = &[]; +const FALSE: &[bool] = &[false]; +const TRUE: &[bool] = &[true]; +const BOTH: &[bool] = &[false, true]; + +impl IntoIterator for BinValues { + type Item = &'static bool; + type IntoIter = slice::Iter<'static, bool>; + + fn into_iter(self) -> Self::IntoIter { + match self { + BinValues::None => NONE.into_iter(), + BinValues::False => FALSE.into_iter(), + BinValues::True => TRUE.into_iter(), + BinValues::Both => BOTH.into_iter(), + } + } +} diff --git a/src/agreement/mod.rs b/src/agreement/mod.rs index ffb8319..3855135 100644 --- a/src/agreement/mod.rs +++ b/src/agreement/mod.rs @@ -15,32 +15,32 @@ //! All messages are annotated with the epoch they belong to, but we omit that here for brevity. //! //! * At the beginning of each epoch, we multicast `BVal(e)`. It translates to: "I know that `e` is -//! a viable output." +//! a viable output." //! //! * Once we receive `BVal(v)` with the same value from _f + 1_ different validators, we know that //! at least one of them must be correct. So we know that `v` is a viable output. If we haven't //! done so already we multicast `BVal(v)`. (Even if we already multicast `BVal(!v)`). //! //! * Let's say a node _believes in `v`_ if it received `BVal(v)` from _2 f + 1_ validators. -//! For the _first_ value `v` we believe in, we multicast `Aux(v)`. It translates to: -//! "I know that all correct nodes will eventually know that `v` is a viable output. -//! I'm not sure about `!v` yet." +//! For the _first_ value `v` we believe in, we multicast `Aux(v)`. It translates to: +//! "I know that all correct nodes will eventually know that `v` is a viable output. +//! I'm not sure about `!v` yet." //! //! * Since every node will receive at least _2 f + 1_ `BVal` messages from correct validators, -//! there is at least one value `v`, such that every node receives _f + 1_ `BVal(v)` messages. -//! As a consequence, every correct validator will multicast `BVal(v)` itself. Hence we are -//! guaranteed to receive _2 f + 1_ `BVal(v)` messages. -//! In short: If _any_ correct node believes in `v`, _every_ correct node will. +//! there is at least one value `v`, such that every node receives _f + 1_ `BVal(v)` messages. +//! As a consequence, every correct validator will multicast `BVal(v)` itself. Hence we are +//! guaranteed to receive _2 f + 1_ `BVal(v)` messages. +//! In short: If _any_ correct node believes in `v`, _every_ correct node will. //! //! * Every correct node will eventually send exactly one `Aux`, so we will receive at least -//! _2 f + 1_ `Aux` messages with values we believe in. At that point, we define the set `vals` -//! of _candidate values_: the set of values we believe in _and_ have received in an `Aux`. +//! _N - f_ `Aux` messages with values we believe in. At that point, we define the set `vals` +//! of _candidate values_: the set of values we believe in _and_ have received in an `Aux`. //! //! * Once we have the set of candidate values, we obtain a _coin value_ `s` (see below). //! //! * If there is only a single candidate value `b`, we set our estimate `e = b`. If `s == b`, -//! we _output_ and send a `Term(b)` message which is interpreted as `BVal(b)` and `Aux(b)` for -//! all future epochs. If `s != b`, we just proceed to the next epoch. +//! we _output_ and send a `Term(b)` message which is interpreted as `BVal(b)` and `Aux(b)` for +//! all future epochs. If `s != b`, we just proceed to the next epoch. //! //! * If both values are candidates, we set `e = s` and proceed to the next epoch. //! @@ -57,7 +57,7 @@ //! * We multicast a `Conf` message containing our candidate values. //! //! * Since every good node believes in all values it puts into its `Conf` message, we will -//! eventually receive _2 f + 1_ `Conf` messages containing only values we believe in. Then we +//! eventually receive _N - f_ `Conf` messages containing only values we believe in. Then we //! trigger the common coin. //! //! * After _f + 1_ nodes have sent us their coin shares, we receive the coin output and assign it @@ -68,13 +68,13 @@ pub mod bin_values; use rand; use std::collections::{BTreeMap, BTreeSet}; use std::fmt::Debug; -use std::mem::replace; use std::sync::Arc; use itertools::Itertools; use agreement::bin_values::BinValues; use common_coin::{self, CommonCoin, CommonCoinMessage}; +use fault_log::{Fault, FaultKind}; use messaging::{self, DistAlgorithm, NetworkInfo, Target}; error_chain!{ @@ -114,6 +114,14 @@ impl AgreementContent { content: self, } } + + /// Returns `true` if this message can be ignored if its epoch has already passed. + pub fn can_expire(&self) -> bool { + match *self { + AgreementContent::Term(_) => false, + _ => true, + } + } } /// Messages sent during the binary Byzantine agreement stage. @@ -143,13 +151,30 @@ impl rand::Rand for AgreementContent { } } -/// Possible values of the common coin schedule defining the method to derive the common coin in a -/// given epoch: as a constant value or a distributed computation. +/// The state of the current epoch's common coin. In some epochs this is fixed, in others it starts +/// with in `InProgress`. #[derive(Debug)] -enum CoinSchedule { - False, - True, - Random, +enum CoinState { + /// The value was fixed in the current epoch, or the coin has already terminated. + Decided(bool), + /// The coin value is not known yet. + InProgress(CommonCoin), +} + +impl CoinState { + /// Returns the value, if this coin has already decided. + fn value(&self) -> Option { + match self { + CoinState::Decided(value) => Some(*value), + CoinState::InProgress(_) => None, + } + } +} + +impl From for CoinState { + fn from(value: bool) -> Self { + CoinState::Decided(value) + } } /// Binary Agreement instance @@ -166,15 +191,16 @@ pub struct Agreement { /// Bin values. Reset on every epoch update. bin_values: BinValues, /// Values received in `BVal` messages. Reset on every epoch update. - received_bval: BTreeMap>, + received_bval: BTreeMap>, /// Sent `BVal` values. Reset on every epoch update. sent_bval: BTreeSet, /// Values received in `Aux` messages. Reset on every epoch update. - received_aux: BTreeMap, + received_aux: BTreeMap>, /// Received `Conf` messages. Reset on every epoch update. received_conf: BTreeMap, - /// Received `Term` messages. Kept throughout epoch updates. - received_term: BTreeMap, + /// Received `Term` messages. Kept throughout epoch updates. These count as `BVal`, `Aux` and + /// `Conf` messages for all future epochs. + received_term: BTreeMap>, /// The estimate of the decision value in the current epoch. estimated: Option, /// A permanent, latching copy of the output value. This copy is required because `output` can @@ -185,17 +211,11 @@ pub struct Agreement { decision: Option, /// A cache for messages for future epochs that cannot be handled yet. // TODO: Find a better solution for this; defend against spam. - incoming_queue: Vec<(NodeUid, AgreementMessage)>, - /// Termination flag. Once the instance determines that all the remote nodes have reached - /// agreement or have the necessary information to reach agreement, it sets the `terminated` - /// flag and accepts no more incoming messages. - terminated: bool, - /// Whether the `Conf` message round has started in the current epoch. - conf_round: bool, - /// A common coin instance. It is reset on epoch update. - common_coin: CommonCoin, - /// Common coin schedule computed at the start of each epoch. - coin_schedule: CoinSchedule, + incoming_queue: BTreeMap>, + /// The values we found in the first _N - f_ `Aux` messages that were in `bin_values`. + conf_values: Option, + /// The state of this epoch's common coin. + coin_state: CoinState, } pub type Step = messaging::Step>; @@ -215,29 +235,24 @@ impl DistAlgorithm for Agreement { fn handle_message( &mut self, sender_id: &Self::NodeUid, - message: Self::Message, + AgreementMessage { epoch, content }: Self::Message, ) -> Result> { - if self.terminated || message.epoch < self.epoch { + if self.decision.is_some() || (epoch < self.epoch && content.can_expire()) { // Message is obsolete: We are already in a later epoch or terminated. Ok(Step::default()) - } else if message.epoch > self.epoch { + } else if epoch > self.epoch { // Message is for a later epoch. We can't handle that yet. - self.incoming_queue.push((sender_id.clone(), message)); + let queue = self.incoming_queue.entry(epoch).or_insert_with(Vec::new); + queue.push((sender_id.clone(), content)); Ok(Step::default()) } else { - match message.content { - AgreementContent::BVal(b) => self.handle_bval(sender_id, b), - AgreementContent::Aux(b) => self.handle_aux(sender_id, b), - AgreementContent::Conf(v) => self.handle_conf(sender_id, v), - AgreementContent::Term(v) => self.handle_term(sender_id, v), - AgreementContent::Coin(msg) => self.handle_coin(sender_id, *msg), - } + self.handle_message_content(sender_id, content) } } /// Whether the algorithm has terminated. fn terminated(&self) -> bool { - self.terminated + self.decision.is_some() } fn our_id(&self) -> &Self::NodeUid { @@ -251,33 +266,26 @@ impl Agreement { session_id: u64, proposer_id: NodeUid, ) -> Result { - let invocation_id = netinfo.invocation_id(); - if let Some(proposer_i) = netinfo.node_index(&proposer_id) { - Ok(Agreement { - netinfo: netinfo.clone(), - session_id, - proposer_id, - epoch: 0, - bin_values: BinValues::new(), - received_bval: BTreeMap::new(), - sent_bval: BTreeSet::new(), - received_aux: BTreeMap::new(), - received_conf: BTreeMap::new(), - received_term: BTreeMap::new(), - estimated: None, - decision: None, - incoming_queue: Vec::new(), - terminated: false, - conf_round: false, - common_coin: CommonCoin::new( - netinfo, - Nonce::new(invocation_id.as_ref(), session_id, proposer_i, 0), - ), - coin_schedule: CoinSchedule::True, - }) - } else { - Err(ErrorKind::UnknownProposer.into()) + if !netinfo.is_node_validator(&proposer_id) { + return Err(ErrorKind::UnknownProposer.into()); } + Ok(Agreement { + netinfo, + session_id, + proposer_id, + epoch: 0, + bin_values: BinValues::new(), + received_bval: BTreeMap::new(), + sent_bval: BTreeSet::new(), + received_aux: BTreeMap::new(), + received_conf: BTreeMap::new(), + received_term: BTreeMap::new(), + estimated: None, + decision: None, + incoming_queue: BTreeMap::new(), + conf_values: None, + coin_state: CoinState::Decided(true), + }) } /// Sets the input value for agreement. @@ -285,17 +293,11 @@ impl Agreement { if self.epoch != 0 || self.estimated.is_some() { return Err(ErrorKind::InputNotAccepted.into()); } - if self.netinfo.num_nodes() == 1 { - let mut step = self.send_bval(input)?; - step.extend(self.send_aux(input)?); - step.extend(self.decide(input)); - Ok(step) - } else { - // Set the initial estimated value to the input value. - self.estimated = Some(input); - // Record the input value as sent. - self.send_bval(input) - } + // Set the initial estimated value to the input value. + self.estimated = Some(input); + debug!("{:?}/{:?} Input {}", self.our_id(), self.proposer_id, input); + // Record the input value as sent. + self.send_bval(input) } /// Acceptance check to be performed before setting the input value. @@ -303,153 +305,93 @@ impl Agreement { self.epoch == 0 && self.estimated.is_none() } + /// Dispatches the message content to the corresponding handling method. + fn handle_message_content( + &mut self, + sender_id: &NodeUid, + content: AgreementContent, + ) -> Result> { + match content { + AgreementContent::BVal(b) => self.handle_bval(sender_id, b), + AgreementContent::Aux(b) => self.handle_aux(sender_id, b), + AgreementContent::Conf(v) => self.handle_conf(sender_id, v), + AgreementContent::Term(v) => self.handle_term(sender_id, v), + AgreementContent::Coin(msg) => self.handle_coin(sender_id, *msg), + } + } + + /// Handles a `BVal(b)` message. + /// + /// Upon receiving _f + 1_ `BVal(b)`, multicast `BVal(b)`. Upon receiving _2 f + 1_ `BVal(b)`, + /// update `bin_values`. When `bin_values` gets its first entry, multicast `Aux(b)`. If the + /// condition is met, starts the `Conf` round or decides. (See `on_bval_or_aux`.) fn handle_bval(&mut self, sender_id: &NodeUid, b: bool) -> Result> { - self.received_bval - .entry(sender_id.clone()) - .or_insert_with(BTreeSet::new) - .insert(b); - let count_bval = self - .received_bval - .values() - .filter(|values| values.contains(&b)) - .count(); + let count_bval = { + let entry = self.received_bval.entry(b).or_insert_with(BTreeSet::new); + if !entry.insert(sender_id.clone()) { + return Ok(Fault::new(sender_id.clone(), FaultKind::DuplicateBVal).into()); + } + entry.len() + }; let mut step = Step::default(); - // upon receiving `BVal(b)` messages from 2f + 1 nodes, - // bin_values := bin_values ∪ {b} if count_bval == 2 * self.netinfo.num_faulty() + 1 { - let previous_bin_values = self.bin_values; - let bin_values_changed = self.bin_values.insert(b); + self.bin_values.insert(b); - // wait until bin_values != 0, then multicast `Aux(w)` - // where w ∈ bin_values - if previous_bin_values == BinValues::None { - // Send an `Aux` message at most once per epoch. - step.extend(self.send_aux(b)?); - } - if bin_values_changed { - step.extend(self.on_bin_values_changed()?); + if self.bin_values != BinValues::Both { + step.extend(self.send(AgreementContent::Aux(b))?) // First entry: send `Aux(b)`. + } else { + step.extend(self.on_bval_or_aux()?); // Otherwise just check for `Conf` condition. } } - if count_bval == self.netinfo.num_faulty() + 1 && !self.sent_bval.contains(&b) { - // upon receiving `BVal(b)` messages from f + 1 nodes, if - // `BVal(b)` has not been sent, multicast `BVal(b)` + if count_bval == self.netinfo.num_faulty() + 1 { step.extend(self.send_bval(b)?); } + Ok(step) } - /// Called when `bin_values` changes as a result of receiving a `BVal` message. Tries to update - /// the epoch. - fn on_bin_values_changed(&mut self) -> Result> { - match self.coin_schedule { - CoinSchedule::False => { - let (aux_count, aux_vals) = self.count_aux(); - if aux_count >= self.netinfo.num_nodes() - self.netinfo.num_faulty() { - self.on_coin(false, aux_vals.definite()) - } else { - Ok(Step::default()) - } - } - CoinSchedule::True => { - let (aux_count, aux_vals) = self.count_aux(); - if aux_count >= self.netinfo.num_nodes() - self.netinfo.num_faulty() { - self.on_coin(true, aux_vals.definite()) - } else { - Ok(Step::default()) - } - } - CoinSchedule::Random => { - // If the `Conf` round has already started, a change in `bin_values` can lead to its - // end. Try if it has indeed finished. - self.try_finish_conf_round() - } - } - } - - fn send_bval(&mut self, b: bool) -> Result> { - if !self.netinfo.is_validator() { - return Ok(Step::default()); - } - // Record the value `b` as sent. - self.sent_bval.insert(b); - // Multicast `BVal`. - let msg = AgreementContent::BVal(b).with_epoch(self.epoch); - let mut step: Step = Target::All.message(msg).into(); - // Receive the `BVal` message locally. - let our_uid = &self.netinfo.our_uid().clone(); - step.extend(self.handle_bval(our_uid, b)?); - Ok(step) - } - - fn send_conf(&mut self) -> Result> { - if self.conf_round { - // Only one `Conf` message is allowed in an epoch. - return Ok(Step::default()); - } - - // Trigger the start of the `Conf` round. - self.conf_round = true; - - if !self.netinfo.is_validator() { - return Ok(Step::default()); - } - - let v = self.bin_values; - // Multicast `Conf`. - let msg = AgreementContent::Conf(v).with_epoch(self.epoch); - let mut step: Step = Target::All.message(msg).into(); - // Receive the `Conf` message locally. - let our_uid = &self.netinfo.our_uid().clone(); - step.extend(self.handle_conf(our_uid, v)?); - Ok(step) - } - - /// Waits until at least (N − f) `Aux` messages have been received, such that - /// the set of values carried by these messages, vals, are a subset of - /// bin_values (note that bin_values_r may continue to change as `BVal` - /// messages are received, thus this condition may be triggered upon arrival - /// of either an `Aux` or a `BVal` message). + /// Handles an `Aux` message. + /// + /// If the condition is met, starts the `Conf` round or decides. (See `on_bval_or_aux`.) fn handle_aux(&mut self, sender_id: &NodeUid, b: bool) -> Result> { // Perform the `Aux` message round only if a `Conf` round hasn't started yet. - if self.conf_round { + if self.conf_values.is_some() { return Ok(Step::default()); } - self.received_aux.insert(sender_id.clone(), b); - if self.bin_values == BinValues::None { - return Ok(Step::default()); - } - let (aux_count, aux_vals) = self.count_aux(); - if aux_count < self.netinfo.num_nodes() - self.netinfo.num_faulty() { - // Continue waiting for the (N - f) `Aux` messages. - return Ok(Step::default()); - } - - // Execute the Common Coin schedule `false, true, get_coin(), false, true, get_coin(), ...` - match self.coin_schedule { - CoinSchedule::False => self.on_coin(false, aux_vals.definite()), - CoinSchedule::True => self.on_coin(true, aux_vals.definite()), - CoinSchedule::Random => self.send_conf(), // Start the `Conf` message round. + // TODO: Detect duplicate `Aux` messages and report faults. + if !self + .received_aux + .entry(b) + .or_insert_with(BTreeSet::new) + .insert(sender_id.clone()) + { + return Ok(Fault::new(sender_id.clone(), FaultKind::DuplicateAux).into()); } + self.on_bval_or_aux() } + /// Handles a `Conf` message. When _N - f_ `Conf` messages with values in `bin_values` have + /// been received, updates the epoch or decides. fn handle_conf(&mut self, sender_id: &NodeUid, v: BinValues) -> Result> { self.received_conf.insert(sender_id.clone(), v); self.try_finish_conf_round() } - /// Receives a `Term(v)` message. If we haven't yet decided on a value and there are more than - /// `num_faulty` such messages with the same value from different nodes, performs expedite - /// termination: decides on `v`, broadcasts `Term(v)` and terminates the instance. + /// Handles a `Term(v)` message. If we haven't yet decided on a value and there are more than + /// _f_ such messages with the same value from different nodes, performs expedite termination: + /// decides on `v`, broadcasts `Term(v)` and terminates the instance. fn handle_term(&mut self, sender_id: &NodeUid, b: bool) -> Result> { - self.received_term.insert(sender_id.clone(), b); + self.received_term + .entry(b) + .or_insert_with(BTreeSet::new) + .insert(sender_id.clone()); // Check for the expedite termination condition. - if self.decision.is_none() - && self.received_term.iter().filter(|(_, &c)| b == c).count() - > self.netinfo.num_faulty() - { + if self.decision.is_some() { + Ok(Step::default()) + } else if self.received_term[&b].len() > self.netinfo.num_faulty() { Ok(self.decide(b)) } else { // Otherwise handle the `Term` as a `BVal`, `Aux` and `Conf`. @@ -467,75 +409,138 @@ impl Agreement { sender_id: &NodeUid, msg: CommonCoinMessage, ) -> Result> { - let coin_step = self.common_coin.handle_message(sender_id, msg)?; + let coin_step = match self.coin_state { + CoinState::Decided(_) => return Ok(Step::default()), // Coin value is already decided. + CoinState::InProgress(ref mut common_coin) => { + common_coin.handle_message(sender_id, msg)? + } + }; self.on_coin_step(coin_step) } + /// Checks whether there are _N - f_ `Aux` messages with values in `bin_values`. If so, starts + /// the `Conf` round or decides. + fn on_bval_or_aux(&mut self) -> Result> { + if self.bin_values == BinValues::None || self.conf_values.is_some() { + return Ok(Step::default()); + } + let (aux_count, aux_vals) = self.count_aux(); + if aux_count < self.netinfo.num_correct() { + return Ok(Step::default()); + } + // Execute the Common Coin schedule `false, true, get_coin(), false, true, get_coin(), ...` + match self.coin_state { + CoinState::Decided(_) => { + self.conf_values = Some(aux_vals); + self.try_update_epoch() + } + CoinState::InProgress(_) => self.send_conf(aux_vals), // Start the `Conf` message round. + } + } + + /// Multicasts a `BVal(b)` message, and handles it. + fn send_bval(&mut self, b: bool) -> Result> { + // Record the value `b` as sent. If it was already there, don't send it again. + if !self.sent_bval.insert(b) { + return Ok(Step::default()); + } + self.send(AgreementContent::BVal(b)) + } + + /// Multicasts a `Conf(values)` message, and handles it. + fn send_conf(&mut self, values: BinValues) -> Result> { + if self.conf_values.is_some() { + // Only one `Conf` message is allowed in an epoch. + return Ok(Step::default()); + } + + // Trigger the start of the `Conf` round. + self.conf_values = Some(values); + + if !self.netinfo.is_validator() { + return Ok(self.try_finish_conf_round()?); + } + + self.send(AgreementContent::Conf(values)) + } + + /// Multicasts and handles a message. Does nothing if we are only an observer. + fn send(&mut self, content: AgreementContent) -> Result> { + if !self.netinfo.is_validator() { + return Ok(Step::default()); + } + let mut step: Step<_> = Target::All + .message(content.clone().with_epoch(self.epoch)) + .into(); + let our_uid = &self.netinfo.our_uid().clone(); + step.extend(self.handle_message_content(our_uid, content)?); + Ok(step) + } + + /// Handles a step returned from the `CommonCoin`. fn on_coin_step( &mut self, coin_step: common_coin::Step, ) -> Result> { let mut step = Step::default(); let epoch = self.epoch; - let coin_output = step.extend_with(coin_step, |c_msg| { - AgreementContent::Coin(Box::new(c_msg)).with_epoch(epoch) - }); + let to_msg = |c_msg| AgreementContent::Coin(Box::new(c_msg)).with_epoch(epoch); + let coin_output = step.extend_with(coin_step, to_msg); if let Some(coin) = coin_output.into_iter().next() { - let def_bin_value = self.count_conf().1.definite(); - step.extend(self.on_coin(coin, def_bin_value)?); + self.coin_state = coin.into(); + step.extend(self.try_update_epoch()?); } Ok(step) } - /// When the common coin has been computed, tries to decide on an output value, updates the - /// `Agreement` epoch and handles queued messages for the new epoch. - fn on_coin(&mut self, coin: bool, def_bin_value: Option) -> Result> { - if self.terminated { + /// If this epoch's coin value or conf values are not known yet, does nothing, otherwise + /// updates the epoch or decides. + /// + /// With two conf values, the next epoch's estimate is the coin value. If there is only one conf + /// value and that disagrees with the coin, the conf value is the next epoch's estimate. If + /// the unique conf value agrees with the coin, terminates and decides on that value. + fn try_update_epoch(&mut self) -> Result> { + if self.decision.is_some() { // Avoid an infinite regression without making an Agreement step. return Ok(Step::default()); } + let coin = match self.coin_state.value() { + None => return Ok(Step::default()), // Still waiting for coin value. + Some(coin) => coin, + }; + let def_bin_value = match self.conf_values { + None => return Ok(Step::default()), // Still waiting for conf value. + Some(ref values) => values.definite(), + }; - if self.decision.is_none() && Some(coin) == def_bin_value { - return Ok(self.decide(coin)); + if Some(coin) == def_bin_value { + Ok(self.decide(coin)) + } else { + self.update_epoch(def_bin_value.unwrap_or(coin)) } - - let b = def_bin_value.unwrap_or(coin); - - self.update_epoch(); - - self.estimated = Some(b); - let mut step = self.send_bval(b)?; - // Create a temporary map of received TERM messages to avoid a second mutable access to - // `self`. - let received_term = replace(&mut self.received_term, BTreeMap::new()); - for (sender_id, b) in received_term { - step.extend(self.handle_term(&sender_id, b)?); - if self.terminated { - return Ok(step); - } - } - let queued_msgs = replace(&mut self.incoming_queue, Vec::new()); - for (sender_id, msg) in queued_msgs { - step.extend(self.handle_message(&sender_id, msg)?); - if self.terminated { - break; - } - } - Ok(step) } - /// Computes the coin schedule for the current `Agreement` epoch. - fn coin_schedule(&self) -> CoinSchedule { + /// Creates the initial coin state for the current epoch, i.e. sets it to the predetermined + /// value, or initializes a `CommonCoin` instance. + fn coin_state(&self) -> CoinState { match self.epoch % 3 { - 0 => CoinSchedule::True, - 1 => CoinSchedule::False, - _ => CoinSchedule::Random, + 0 => CoinState::Decided(true), + 1 => CoinState::Decided(false), + _ => { + let nonce = Nonce::new( + self.netinfo.invocation_id().as_ref(), + self.session_id, + self.netinfo.node_index(&self.proposer_id).unwrap(), + self.epoch, + ); + CoinState::InProgress(CommonCoin::new(self.netinfo.clone(), nonce)) + } } } /// Decides on a value and broadcasts a `Term` message with that value. fn decide(&mut self, b: bool) -> Step { - if self.terminated { + if self.decision.is_some() { return Step::default(); } // Output the agreement value. @@ -551,94 +556,87 @@ impl Agreement { b ); if self.netinfo.is_validator() { - let msg = AgreementContent::Term(b).with_epoch(self.epoch); + let msg = AgreementContent::Term(b).with_epoch(self.epoch + 1); step.messages.push_back(Target::All.message(msg)); - self.received_term.insert(self.netinfo.our_uid().clone(), b); } - self.terminated = true; step } + /// Checks whether the _N - f_ `Conf` messages have arrived, and if so, activates the coin. fn try_finish_conf_round(&mut self) -> Result> { - if self.conf_round - && self.count_conf().0 >= self.netinfo.num_nodes() - self.netinfo.num_faulty() - { - // Invoke the common coin. - let coin_step = self.common_coin.input(())?; - self.on_coin_step(coin_step) - } else { - // Continue waiting for (N - f) `Conf` messages - Ok(Step::default()) - } - } - - fn send_aux(&mut self, b: bool) -> Result> { - if !self.netinfo.is_validator() { + if self.conf_values.is_none() || self.count_conf() < self.netinfo.num_correct() { return Ok(Step::default()); } - // Multicast `Aux`. - let mut step: Step = Target::All - .message(AgreementContent::Aux(b).with_epoch(self.epoch)) - .into(); - // Receive the `Aux` message locally. - let our_uid = &self.netinfo.our_uid().clone(); - step.extend(self.handle_aux(our_uid, b)?); + + // Invoke the common coin. + let coin_step = match self.coin_state { + CoinState::Decided(_) => return Ok(Step::default()), // Coin has already decided. + CoinState::InProgress(ref mut common_coin) => common_coin.input(())?, + }; + let mut step = self.on_coin_step(coin_step)?; + step.extend(self.try_update_epoch()?); Ok(step) } + /// Counts the number of received `Conf` messages with values in `bin_values`. + fn count_conf(&self) -> usize { + let is_bin_val = |conf: &&BinValues| conf.is_subset(self.bin_values); + self.received_conf.values().filter(is_bin_val).count() + } + /// The count of `Aux` messages such that the set of values carried by those messages is a - /// subset of bin_values_r. The count of matching `Term` messages from terminated nodes is also - /// added to the count of `Aux` messages as witnesses of the terminated nodes' decision. + /// subset of `bin_values`. /// - /// In general, we can't expect every good node to send the same `Aux` value, so waiting for N - - /// f agreeing messages would not always terminate. We can, however, expect every good node to - /// send an `Aux` value that will eventually end up in our `bin_values`. + /// In general, we can't expect every good node to send the same `Aux` value, so waiting for + /// _N - f_ agreeing messages would not always terminate. We can, however, expect every good + /// node to send an `Aux` value that will eventually end up in our `bin_values`. fn count_aux(&self) -> (usize, BinValues) { - let aux: BTreeMap<_, _> = self - .received_aux - .iter() - .filter(|(_, &b)| self.bin_values.contains(b)) - .collect(); - - let bin: BinValues = aux.values().map(|&&v| BinValues::from_bool(v)).collect(); - (aux.len(), bin) + let mut values = BinValues::None; + let mut count = 0; + for b in self.bin_values { + let b_count = self.received_aux.get(b).map_or(0, BTreeSet::len); + if b_count > 0 { + values.insert(*b); + count += b_count; + } + } + (count, values) } - /// Counts the number of received `Conf` messages. - fn count_conf(&self) -> (usize, BinValues) { - let (vals_cnt, vals) = self - .received_conf - .values() - .filter(|&conf| conf.is_subset(self.bin_values)) - .tee(); - - (vals_cnt.count(), vals.cloned().collect()) - } - - fn update_epoch(&mut self) { + /// Increments the epoch, sets the new estimate and handles queued messages. + fn update_epoch(&mut self, b: bool) -> Result> { self.bin_values.clear(); - self.received_bval.clear(); + self.received_bval = self.received_term.clone(); self.sent_bval.clear(); - self.received_aux.clear(); + self.received_aux = self.received_term.clone(); self.received_conf.clear(); - self.conf_round = false; + for (v, ids) in &self.received_term { + for id in ids { + self.received_conf + .insert(id.clone(), BinValues::from_bool(*v)); + } + } + self.conf_values = None; self.epoch += 1; - let nonce = Nonce::new( - self.netinfo.invocation_id().as_ref(), - self.session_id, - self.netinfo.node_index(&self.proposer_id).unwrap(), - self.epoch, - ); - // TODO: Don't spend time creating a `CommonCoin` instance in epochs where the common coin - // is known. - self.common_coin = CommonCoin::new(self.netinfo.clone(), nonce); - self.coin_schedule = self.coin_schedule(); + self.coin_state = self.coin_state(); debug!( - "{:?} Agreement instance {:?} started epoch {}", + "{:?} Agreement instance {:?} started epoch {}, {} terminated", self.netinfo.our_uid(), self.proposer_id, - self.epoch + self.epoch, + self.received_conf.len(), ); + + self.estimated = Some(b); + let mut step = self.send_bval(b)?; + let queued_msgs = Itertools::flatten(self.incoming_queue.remove(&self.epoch).into_iter()); + for (sender_id, content) in queued_msgs { + step.extend(self.handle_message_content(&sender_id, content)?); + if self.decision.is_some() { + break; + } + } + Ok(step) } } diff --git a/src/broadcast.rs b/src/broadcast.rs index 6110fbd..b48ef65 100644 --- a/src/broadcast.rs +++ b/src/broadcast.rs @@ -444,9 +444,7 @@ impl Broadcast { // Save the proof for reconstructing the tree later. self.echos.insert(sender_id.clone(), p); - if self.ready_sent - || self.count_echos(&hash) < self.netinfo.num_nodes() - self.netinfo.num_faulty() - { + if self.ready_sent || self.count_echos(&hash) < self.netinfo.num_correct() { return self.compute_output(&hash); } diff --git a/src/common_coin.rs b/src/common_coin.rs index 1941b22..5aae5fd 100644 --- a/src/common_coin.rs +++ b/src/common_coin.rs @@ -166,20 +166,20 @@ where } fn try_output(&mut self) -> Result> { - let received_shares = &self.received_shares; debug!( "{:?} received {} shares, had_input = {}", self.netinfo.our_uid(), - received_shares.len(), + self.received_shares.len(), self.had_input ); - if self.had_input && received_shares.len() > self.netinfo.num_faulty() { + if self.had_input && self.received_shares.len() > self.netinfo.num_faulty() { let sig = self.combine_and_verify_sig()?; // Output the parity of the verified signature. let parity = sig.parity(); debug!("{:?} output {}", self.netinfo.our_uid(), parity); self.terminated = true; - Ok(Step::default().with_output(parity)) + let step = self.input(())?; // Before terminating, make sure we sent our share. + Ok(step.with_output(parity)) } else { Ok(Step::default()) } diff --git a/src/common_subset.rs b/src/common_subset.rs index eb4626c..22a67ce 100644 --- a/src/common_subset.rs +++ b/src/common_subset.rs @@ -247,7 +247,7 @@ impl CommonSubset { self.agreement_results ); - if value && self.count_true() == self.netinfo.num_nodes() - self.netinfo.num_faulty() { + if value && self.count_true() == self.netinfo.num_correct() { // Upon delivery of value 1 from at least N − f instances of BA, provide // input 0 to each instance of BA that has not yet been provided input. for (uid, agreement) in &mut self.agreement_instances { @@ -271,8 +271,7 @@ impl CommonSubset { } fn try_agreement_completion(&mut self) -> Option> { - if self.decided || self.count_true() < self.netinfo.num_nodes() - self.netinfo.num_faulty() - { + if self.decided || self.count_true() < self.netinfo.num_correct() { return None; } // Once all instances of BA have completed, let C ⊂ [1..N] be diff --git a/src/fault_log.rs b/src/fault_log.rs index 529e943..738c432 100644 --- a/src/fault_log.rs +++ b/src/fault_log.rs @@ -37,6 +37,10 @@ pub enum FaultKind { InvalidVoteSignature, /// A validator committed an invalid vote in `DynamicHoneyBadger`. InvalidCommittedVote, + /// `Agreement` received a duplicate `BVal` message. + DuplicateBVal, + /// `Agreement` received a duplicate `Aux` message. + DuplicateAux, } /// A structure representing the context of a faulty node. This structure diff --git a/src/messaging.rs b/src/messaging.rs index 0a23ce7..a924130 100644 --- a/src/messaging.rs +++ b/src/messaging.rs @@ -275,17 +275,23 @@ impl NetworkInfo { self.public_keys.keys() } - /// The total number of nodes. + /// The total number _N_ of nodes. pub fn num_nodes(&self) -> usize { self.num_nodes } - /// The maximum number of faulty, Byzantine nodes up to which Honey Badger is guaranteed to be - /// correct. + /// The maximum number _f_ of faulty, Byzantine nodes up to which Honey Badger is guaranteed to + /// be correct. pub fn num_faulty(&self) -> usize { self.num_faulty } + /// The minimum number _N - f_ of correct nodes with which Honey Badger is guaranteed to be + /// correct. + pub fn num_correct(&self) -> usize { + self.num_nodes - self.num_faulty + } + /// Returns our secret key share for threshold cryptography. pub fn secret_key_share(&self) -> &SecretKeyShare { &self.secret_key_share