diff --git a/src/agreement/bool_multimap.rs b/src/agreement/bool_multimap.rs new file mode 100644 index 0000000..94165cb --- /dev/null +++ b/src/agreement/bool_multimap.rs @@ -0,0 +1,67 @@ +use std::collections::{btree_set, BTreeSet}; +use std::ops::{Index, IndexMut}; + +/// A map from `bool` to `BTreeSet`. +#[derive(Debug, Clone)] +pub struct BoolMultimap([BTreeSet; 2]); + +impl Default for BoolMultimap { + fn default() -> Self { + BoolMultimap([BTreeSet::default(), BTreeSet::default()]) + } +} + +impl Index for BoolMultimap { + type Output = BTreeSet; + + fn index(&self, index: bool) -> &BTreeSet { + &self.0[if index { 1 } else { 0 }] + } +} + +impl IndexMut for BoolMultimap { + fn index_mut(&mut self, index: bool) -> &mut BTreeSet { + &mut self.0[if index { 1 } else { 0 }] + } +} + +impl<'a, N: Ord> IntoIterator for &'a BoolMultimap { + type Item = (bool, &'a N); + type IntoIter = Iter<'a, N>; + + fn into_iter(self) -> Iter<'a, N> { + Iter::new(self) + } +} + +pub struct Iter<'a, N: 'a> { + key: bool, + set_iter: btree_set::Iter<'a, N>, + map: &'a BoolMultimap, +} + +impl<'a, N: 'a + Ord> Iter<'a, N> { + fn new(map: &'a BoolMultimap) -> Self { + Iter { + key: false, + set_iter: map[false].iter(), + map, + } + } +} + +impl<'a, N: 'a + Ord> Iterator for Iter<'a, N> { + type Item = (bool, &'a N); + + fn next(&mut self) -> Option<(bool, &'a N)> { + if let Some(n) = self.set_iter.next() { + Some((self.key, n)) + } else if self.key { + None + } else { + self.key = true; + self.set_iter = self.map[true].iter(); + self.next() + } + } +} diff --git a/src/agreement/mod.rs b/src/agreement/mod.rs index 490034d..2226e28 100644 --- a/src/agreement/mod.rs +++ b/src/agreement/mod.rs @@ -63,18 +63,21 @@ //! * After _f + 1_ nodes have sent us their coin shares, we receive the coin output and assign it //! to `s`. +mod bool_multimap; pub mod bool_set; +mod sbv_broadcast; use rand; -use std::collections::{BTreeMap, BTreeSet}; +use std::collections::BTreeMap; use std::fmt::Debug; use std::sync::Arc; use itertools::Itertools; +use self::bool_multimap::BoolMultimap; +use self::sbv_broadcast::SbvBroadcast; use agreement::bool_set::BoolSet; use common_coin::{self, CommonCoin, CommonCoinMessage}; -use fault_log::{Fault, FaultKind}; use messaging::{self, DistAlgorithm, NetworkInfo, Target}; /// An agreement error. @@ -95,10 +98,8 @@ pub type Result = ::std::result::Result; #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub enum AgreementContent { - /// `BVal` message. - BVal(bool), - /// `Aux` message. - Aux(bool), + /// Synchronized Binary Value Broadcast message. + SbvBroadcast(sbv_broadcast::Message), /// `Conf` message. Conf(BoolSet), /// `Term` message. @@ -137,13 +138,10 @@ pub struct AgreementMessage { // with no replacement in sight. impl rand::Rand for AgreementContent { fn rand(rng: &mut R) -> Self { - let message_type = *rng - .choose(&["bval", "aux", "conf", "term", "coin"]) - .unwrap(); + let message_type = *rng.choose(&["sbvb", "conf", "term", "coin"]).unwrap(); match message_type { - "bval" => AgreementContent::BVal(rand::random()), - "aux" => AgreementContent::Aux(rand::random()), + "sbvb" => AgreementContent::SbvBroadcast(rand::random()), "conf" => AgreementContent::Conf(rand::random()), "term" => AgreementContent::Term(rand::random()), "coin" => AgreementContent::Coin(Box::new(rand::random())), @@ -189,19 +187,13 @@ pub struct Agreement { proposer_id: NodeUid, /// Agreement algorithm epoch. epoch: u32, - /// Bin values. Reset on every epoch update. - bin_values: BoolSet, - /// Values received in `BVal` messages. Reset on every epoch update. - received_bval: BTreeMap>, - /// Sent `BVal` values. Reset on every epoch update. - sent_bval: BoolSet, - /// Values received in `Aux` messages. Reset on every epoch update. - received_aux: BTreeMap>, + /// This epoch's Synchronized Binary Value Broadcast instance. + sbv_broadcast: SbvBroadcast, /// Received `Conf` messages. Reset on every epoch update. received_conf: BTreeMap, /// Received `Term` messages. Kept throughout epoch updates. These count as `BVal`, `Aux` and /// `Conf` messages for all future epochs. - received_term: BTreeMap>, + received_term: BoolMultimap, /// The estimate of the decision value in the current epoch. estimated: Option, /// A permanent, latching copy of the output value. This copy is required because `output` can @@ -271,16 +263,13 @@ impl Agreement { return Err(Error::UnknownProposer); } Ok(Agreement { - netinfo, + netinfo: netinfo.clone(), session_id, proposer_id, epoch: 0, - bin_values: bool_set::NONE, - received_bval: BTreeMap::new(), - sent_bval: bool_set::NONE, - received_aux: BTreeMap::new(), + sbv_broadcast: SbvBroadcast::new(netinfo), received_conf: BTreeMap::new(), - received_term: BTreeMap::new(), + received_term: BoolMultimap::default(), estimated: None, decision: None, incoming_queue: BTreeMap::new(), @@ -297,8 +286,8 @@ impl Agreement { // Set the initial estimated value to the input value. self.estimated = Some(input); debug!("{:?}/{:?} Input {}", self.our_id(), self.proposer_id, input); - // Record the input value as sent. - self.send_bval(input) + let sbvb_step = self.sbv_broadcast.input(input)?; + self.handle_sbvb_step(sbvb_step) } /// Acceptance check to be performed before setting the input value. @@ -313,65 +302,50 @@ impl Agreement { content: AgreementContent, ) -> Result> { match content { - AgreementContent::BVal(b) => self.handle_bval(sender_id, b), - AgreementContent::Aux(b) => self.handle_aux(sender_id, b), + AgreementContent::SbvBroadcast(msg) => self.handle_sbv_broadcast(sender_id, msg), AgreementContent::Conf(v) => self.handle_conf(sender_id, v), AgreementContent::Term(v) => self.handle_term(sender_id, v), AgreementContent::Coin(msg) => self.handle_coin(sender_id, *msg), } } - /// Handles a `BVal(b)` message. - /// - /// Upon receiving _f + 1_ `BVal(b)`, multicast `BVal(b)`. Upon receiving _2 f + 1_ `BVal(b)`, - /// update `bin_values`. When `bin_values` gets its first entry, multicast `Aux(b)`. If the - /// condition is met, starts the `Conf` round or decides. (See `on_bval_or_aux`.) - fn handle_bval(&mut self, sender_id: &NodeUid, b: bool) -> Result> { - let count_bval = { - let entry = self.received_bval.entry(b).or_insert_with(BTreeSet::new); - if !entry.insert(sender_id.clone()) { - return Ok(Fault::new(sender_id.clone(), FaultKind::DuplicateBVal).into()); - } - entry.len() - }; - - let mut step = Step::default(); - - if count_bval == 2 * self.netinfo.num_faulty() + 1 { - self.bin_values.insert(b); - - if self.bin_values != bool_set::BOTH { - step.extend(self.send(AgreementContent::Aux(b))?) // First entry: send `Aux(b)`. - } else { - step.extend(self.on_bval_or_aux()?); // Otherwise just check for `Conf` condition. - } - } - - if count_bval == self.netinfo.num_faulty() + 1 { - step.extend(self.send_bval(b)?); - } - - Ok(step) + /// Handles a Synchroniced Binary Value Broadcast message. + fn handle_sbv_broadcast( + &mut self, + sender_id: &NodeUid, + msg: sbv_broadcast::Message, + ) -> Result> { + let sbvb_step = self.sbv_broadcast.handle_message(sender_id, msg)?; + self.handle_sbvb_step(sbvb_step) } - /// Handles an `Aux` message. - /// - /// If the condition is met, starts the `Conf` round or decides. (See `on_bval_or_aux`.) - fn handle_aux(&mut self, sender_id: &NodeUid, b: bool) -> Result> { - // Perform the `Aux` message round only if a `Conf` round hasn't started yet. + /// Handles a Synchronized Binary Value Broadcast step. On output, starts the `Conf` round or + /// decides. + fn handle_sbvb_step( + &mut self, + sbvb_step: sbv_broadcast::Step, + ) -> Result> { + let mut step = Step::default(); + let output = step.extend_with(sbvb_step, |msg| { + AgreementContent::SbvBroadcast(msg).with_epoch(self.epoch) + }); if self.conf_values.is_some() { - return Ok(Step::default()); + return Ok(step); // The `Conf` round has already started. } - // TODO: Detect duplicate `Aux` messages and report faults. - if !self - .received_aux - .entry(b) - .or_insert_with(BTreeSet::new) - .insert(sender_id.clone()) - { - return Ok(Fault::new(sender_id.clone(), FaultKind::DuplicateAux).into()); + if let Some(aux_vals) = output.into_iter().next() { + // Execute the Common Coin schedule `false, true, get_coin(), false, true, get_coin(), ...` + match self.coin_state { + CoinState::Decided(_) => { + self.conf_values = Some(aux_vals); + step.extend(self.try_update_epoch()?) + } + CoinState::InProgress(_) => { + // Start the `Conf` message round. + step.extend(self.send_conf(aux_vals)?) + } + } } - self.on_bval_or_aux() + Ok(step) } /// Handles a `Conf` message. When _N - f_ `Conf` messages with values in `bin_values` have @@ -385,19 +359,17 @@ impl Agreement { /// _f_ such messages with the same value from different nodes, performs expedite termination: /// decides on `v`, broadcasts `Term(v)` and terminates the instance. fn handle_term(&mut self, sender_id: &NodeUid, b: bool) -> Result> { - self.received_term - .entry(b) - .or_insert_with(BTreeSet::new) - .insert(sender_id.clone()); + self.received_term[b].insert(sender_id.clone()); // Check for the expedite termination condition. if self.decision.is_some() { Ok(Step::default()) - } else if self.received_term[&b].len() > self.netinfo.num_faulty() { + } else if self.received_term[b].len() > self.netinfo.num_faulty() { Ok(self.decide(b)) } else { // Otherwise handle the `Term` as a `BVal`, `Aux` and `Conf`. - let mut step = self.handle_bval(sender_id, b)?; - step.extend(self.handle_aux(sender_id, b)?); + let mut sbvb_step = self.sbv_broadcast.handle_bval(sender_id, b)?; + sbvb_step.extend(self.sbv_broadcast.handle_aux(sender_id, b)?); + let mut step = self.handle_sbvb_step(sbvb_step)?; step.extend(self.handle_conf(sender_id, BoolSet::from(b))?); Ok(step) } @@ -419,35 +391,6 @@ impl Agreement { self.on_coin_step(coin_step) } - /// Checks whether there are _N - f_ `Aux` messages with values in `bin_values`. If so, starts - /// the `Conf` round or decides. - fn on_bval_or_aux(&mut self) -> Result> { - if self.bin_values == bool_set::NONE || self.conf_values.is_some() { - return Ok(Step::default()); - } - let (aux_count, aux_vals) = self.count_aux(); - if aux_count < self.netinfo.num_correct() { - return Ok(Step::default()); - } - // Execute the Common Coin schedule `false, true, get_coin(), false, true, get_coin(), ...` - match self.coin_state { - CoinState::Decided(_) => { - self.conf_values = Some(aux_vals); - self.try_update_epoch() - } - CoinState::InProgress(_) => self.send_conf(aux_vals), // Start the `Conf` message round. - } - } - - /// Multicasts a `BVal(b)` message, and handles it. - fn send_bval(&mut self, b: bool) -> Result> { - // Record the value `b` as sent. If it was already there, don't send it again. - if !self.sent_bval.insert(b) { - return Ok(Step::default()); - } - self.send(AgreementContent::BVal(b)) - } - /// Multicasts a `Conf(values)` message, and handles it. fn send_conf(&mut self, values: BoolSet) -> Result> { if self.conf_values.is_some() { @@ -583,40 +526,16 @@ impl Agreement { /// Counts the number of received `Conf` messages with values in `bin_values`. fn count_conf(&self) -> usize { - let is_bin_val = |conf: &&BoolSet| conf.is_subset(self.bin_values); + let is_bin_val = |conf: &&BoolSet| conf.is_subset(self.sbv_broadcast.bin_values()); self.received_conf.values().filter(is_bin_val).count() } - /// The count of `Aux` messages such that the set of values carried by those messages is a - /// subset of `bin_values`. - /// - /// In general, we can't expect every good node to send the same `Aux` value, so waiting for - /// _N - f_ agreeing messages would not always terminate. We can, however, expect every good - /// node to send an `Aux` value that will eventually end up in our `bin_values`. - fn count_aux(&self) -> (usize, BoolSet) { - let mut values = bool_set::NONE; - let mut count = 0; - for b in self.bin_values { - let b_count = self.received_aux.get(&b).map_or(0, BTreeSet::len); - if b_count > 0 { - values.insert(b); - count += b_count; - } - } - (count, values) - } - /// Increments the epoch, sets the new estimate and handles queued messages. fn update_epoch(&mut self, b: bool) -> Result> { - self.bin_values = bool_set::NONE; - self.received_bval = self.received_term.clone(); - self.sent_bval = bool_set::NONE; - self.received_aux = self.received_term.clone(); + self.sbv_broadcast.clear(&self.received_term); self.received_conf.clear(); - for (v, ids) in &self.received_term { - for id in ids { - self.received_conf.insert(id.clone(), BoolSet::from(*v)); - } + for (v, id) in &self.received_term { + self.received_conf.insert(id.clone(), BoolSet::from(v)); } self.conf_values = None; self.epoch += 1; @@ -630,7 +549,8 @@ impl Agreement { ); self.estimated = Some(b); - let mut step = self.send_bval(b)?; + let sbvb_step = self.sbv_broadcast.input(b)?; + let mut step = self.handle_sbvb_step(sbvb_step)?; let queued_msgs = Itertools::flatten(self.incoming_queue.remove(&self.epoch).into_iter()); for (sender_id, content) in queued_msgs { step.extend(self.handle_message_content(&sender_id, content)?); diff --git a/src/agreement/sbv_broadcast.rs b/src/agreement/sbv_broadcast.rs new file mode 100644 index 0000000..b07d338 --- /dev/null +++ b/src/agreement/sbv_broadcast.rs @@ -0,0 +1,208 @@ +//! # Synchronized Binary Value Broadcast +//! +//! This performs the `BVal` and `Aux` steps for `Agreement`. +//! +//! Validators input binary values, and each node outputs a set of one or two binary values. +//! These outputs are not necessarily the same in each node, but it is guaranteed that whenever two +//! nodes output singletons _{v}_ and _{w}_, then _v = w_. +//! +//! It will only output once, but can continue handling messages and will keep track of the set +//! `bin_values` of values for which _2 f + 1_ `BVal`s were received. + +use rand; +use std::fmt::Debug; +use std::sync::Arc; + +use super::bool_multimap::BoolMultimap; +use super::bool_set::{self, BoolSet}; +use super::{Error, Result}; +use fault_log::{Fault, FaultKind}; +use messaging::{self, DistAlgorithm, NetworkInfo, Target}; + +pub type Step = messaging::Step>; + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +pub enum Message { + BVal(bool), + Aux(bool), +} + +// NOTE: Extending rand_derive to correctly generate random values from boxes would make this +// implementation obsolete; however at the time of this writing, `rand::Rand` is already deprecated +// with no replacement in sight. +impl rand::Rand for Message { + fn rand(rng: &mut R) -> Self { + let message_type = *rng.choose(&["bval", "aux"]).unwrap(); + + match message_type { + "bval" => Message::BVal(rand::random()), + "aux" => Message::Aux(rand::random()), + _ => unreachable!(), + } + } +} + +#[derive(Debug)] +pub struct SbvBroadcast { + /// Shared network information. + netinfo: Arc>, + /// The set of values for which _2 f + 1_ `BVal`s have been received. + bin_values: BoolSet, + /// The nodes that sent us a `BVal(b)`, by `b`. + received_bval: BoolMultimap, + /// The values `b` for which we already sent `BVal(b)`. + sent_bval: BoolSet, + /// The nodes that sent us an `Aux(b)`, by `b`. + received_aux: BoolMultimap, + /// Whether we have already output. + terminated: bool, +} + +impl DistAlgorithm for SbvBroadcast { + type NodeUid = NodeUid; + type Input = bool; + type Output = BoolSet; + type Message = Message; + type Error = Error; + + fn input(&mut self, input: Self::Input) -> Result> { + self.send_bval(input) + } + + fn handle_message( + &mut self, + sender_id: &Self::NodeUid, + msg: Self::Message, + ) -> Result> { + match msg { + Message::BVal(b) => self.handle_bval(sender_id, b), + Message::Aux(b) => self.handle_aux(sender_id, b), + } + } + + fn terminated(&self) -> bool { + self.terminated + } + + fn our_id(&self) -> &Self::NodeUid { + self.netinfo.our_uid() + } +} + +impl SbvBroadcast { + pub fn new(netinfo: Arc>) -> Self { + SbvBroadcast { + netinfo, + bin_values: bool_set::NONE, + received_bval: BoolMultimap::default(), + sent_bval: bool_set::NONE, + received_aux: BoolMultimap::default(), + terminated: false, + } + } + + /// Resets the algorithm, but assumes the given `init` values have already been received as + /// both `BVal` and `Aux` messages. + pub fn clear(&mut self, init: &BoolMultimap) { + self.bin_values = bool_set::NONE; + self.received_bval = init.clone(); + self.sent_bval = bool_set::NONE; + self.received_aux = init.clone(); + self.terminated = false; + } + + /// Handles a `BVal(b)` message. + /// + /// Upon receiving _f + 1_ `BVal(b)`, multicasts `BVal(b)`. Upon receiving _2 f + 1_ `BVal(b)`, + /// updates `bin_values`. When `bin_values` gets its first entry, multicasts `Aux(b)`. + pub fn handle_bval(&mut self, sender_id: &NodeUid, b: bool) -> Result> { + let count_bval = { + if !self.received_bval[b].insert(sender_id.clone()) { + return Ok(Fault::new(sender_id.clone(), FaultKind::DuplicateBVal).into()); + } + self.received_bval[b].len() + }; + + let mut step = Step::default(); + + if count_bval == 2 * self.netinfo.num_faulty() + 1 { + self.bin_values.insert(b); + + if self.bin_values != bool_set::BOTH { + step.extend(self.send(Message::Aux(b))?) // First entry: send `Aux(b)`. + } else { + step.extend(self.try_output()?); // Otherwise just check for `Conf` condition. + } + } + + if count_bval == self.netinfo.num_faulty() + 1 { + step.extend(self.send_bval(b)?); + } + + Ok(step) + } + + /// Returns the current `bin_values`: the set of `b` for which _2 f + 1_ `BVal`s were received. + pub fn bin_values(&self) -> BoolSet { + self.bin_values + } + + /// Multicasts and handles a message. Does nothing if we are only an observer. + fn send(&mut self, msg: Message) -> Result> { + if !self.netinfo.is_validator() { + return Ok(Step::default()); + } + let mut step: Step<_> = Target::All.message(msg.clone()).into(); + let our_uid = &self.netinfo.our_uid().clone(); + step.extend(self.handle_message(our_uid, msg)?); + Ok(step) + } + + /// Multicasts a `BVal(b)` message, and handles it. + fn send_bval(&mut self, b: bool) -> Result> { + // Record the value `b` as sent. If it was already there, don't send it again. + if !self.sent_bval.insert(b) { + return Ok(Step::default()); + } + self.send(Message::BVal(b)) + } + + /// Handles an `Aux` message. + pub fn handle_aux(&mut self, sender_id: &NodeUid, b: bool) -> Result> { + if !self.received_aux[b].insert(sender_id.clone()) { + return Ok(Fault::new(sender_id.clone(), FaultKind::DuplicateAux).into()); + } + self.try_output() + } + + /// Checks whether there are _N - f_ `Aux` messages with values in `bin_values`, and outputs. + fn try_output(&mut self) -> Result> { + if self.terminated || self.bin_values == bool_set::NONE { + return Ok(Step::default()); + } + let (aux_count, aux_vals) = self.count_aux(); + if aux_count < self.netinfo.num_correct() { + return Ok(Step::default()); + } + self.terminated = true; + Ok(Step::default().with_output(aux_vals)) + } + + /// The count of `Aux` messages such that the set of values carried by those messages is a + /// subset of `bin_values`. + /// + /// In general, we can't expect every good node to send the same `Aux` value, so waiting for + /// _N - f_ agreeing messages would not always terminate. We can, however, expect every good + /// node to send an `Aux` value that will eventually end up in our `bin_values`. + fn count_aux(&self) -> (usize, BoolSet) { + let mut values = bool_set::NONE; + let mut count = 0; + for b in self.bin_values { + if !self.received_aux[b].is_empty() { + values.insert(b); + count += self.received_aux[b].len(); + } + } + (count, values) + } +}