From 102fa0e01d1b972967d7521908e42bd48e3888c4 Mon Sep 17 00:00:00 2001 From: Andreas Fackler Date: Tue, 24 Jul 2018 11:43:35 +0200 Subject: [PATCH] Remove output and message queue from Agreement. --- src/agreement/mod.rs | 184 +++++++++++++++++++------------------------ 1 file changed, 79 insertions(+), 105 deletions(-) diff --git a/src/agreement/mod.rs b/src/agreement/mod.rs index 16a69af..a46fbdb 100644 --- a/src/agreement/mod.rs +++ b/src/agreement/mod.rs @@ -66,7 +66,7 @@ pub mod bin_values; use rand; -use std::collections::{BTreeMap, BTreeSet, VecDeque}; +use std::collections::{BTreeMap, BTreeSet}; use std::fmt::Debug; use std::mem::replace; use std::sync::Arc; @@ -75,8 +75,7 @@ use itertools::Itertools; use agreement::bin_values::BinValues; use common_coin::{self, CommonCoin, CommonCoinMessage}; -use fault_log::FaultLog; -use messaging::{self, DistAlgorithm, NetworkInfo, Target, TargetedMessage}; +use messaging::{self, DistAlgorithm, NetworkInfo, Target}; error_chain!{ links { @@ -176,10 +175,6 @@ pub struct Agreement { received_term: BTreeMap, /// The estimate of the decision value in the current epoch. estimated: Option, - /// The value output by the agreement instance. It is set once to `Some(b)` - /// and then never changed. That is, no instance of Binary Agreement can - /// decide on two different values of output. - output: Option, /// A permanent, latching copy of the output value. This copy is required because `output` can /// be consumed using `DistAlgorithm::next_output` immediately after the instance finishing to /// handle a message, in which case it would otherwise be unknown whether the output value was @@ -193,8 +188,6 @@ pub struct Agreement { /// agreement or have the necessary information to reach agreement, it sets the `terminated` /// flag and accepts no more incoming messages. terminated: bool, - /// The outgoing message queue. - messages: VecDeque, /// Whether the `Conf` message round has started in the current epoch. conf_round: bool, /// A common coin instance. It is reset on epoch update. @@ -213,8 +206,7 @@ impl DistAlgorithm for Agreement { type Error = Error; fn input(&mut self, input: Self::Input) -> Result> { - let fault_log = self.set_input(input)?; - self.step(fault_log) + self.set_input(input) } /// Receive input from a remote node. @@ -223,23 +215,22 @@ impl DistAlgorithm for Agreement { sender_id: &Self::NodeUid, message: Self::Message, ) -> Result> { - let fault_log = if self.terminated || message.epoch < self.epoch { + if self.terminated || message.epoch < self.epoch { // Message is obsolete: We are already in a later epoch or terminated. - FaultLog::new() + Ok(Step::default()) } else if message.epoch > self.epoch { // Message is for a later epoch. We can't handle that yet. self.incoming_queue.push((sender_id.clone(), message)); - FaultLog::new() + Ok(Step::default()) } else { match message.content { - AgreementContent::BVal(b) => self.handle_bval(sender_id, b)?, - AgreementContent::Aux(b) => self.handle_aux(sender_id, b)?, - AgreementContent::Conf(v) => self.handle_conf(sender_id, v)?, - AgreementContent::Term(v) => self.handle_term(sender_id, v)?, - AgreementContent::Coin(msg) => self.handle_coin(sender_id, *msg)?, + AgreementContent::BVal(b) => self.handle_bval(sender_id, b), + AgreementContent::Aux(b) => self.handle_aux(sender_id, b), + AgreementContent::Conf(v) => self.handle_conf(sender_id, v), + AgreementContent::Term(v) => Ok(self.handle_term(sender_id, v)), + AgreementContent::Coin(msg) => self.handle_coin(sender_id, *msg), } - }; - self.step(fault_log) + } } /// Whether the algorithm has terminated. @@ -272,11 +263,9 @@ impl Agreement { received_conf: BTreeMap::new(), received_term: BTreeMap::new(), estimated: None, - output: None, decision: None, incoming_queue: Vec::new(), terminated: false, - messages: VecDeque::new(), conf_round: false, common_coin: CommonCoin::new( netinfo, @@ -289,27 +278,16 @@ impl Agreement { } } - fn step(&mut self, fault_log: FaultLog) -> Result> { - Ok(Step::new( - self.output.take().into_iter().collect(), - fault_log, - self.messages - .drain(..) - .map(|msg| Target::All.message(msg)) - .collect(), - )) - } - /// Sets the input value for agreement. - fn set_input(&mut self, input: bool) -> Result> { + fn set_input(&mut self, input: bool) -> Result> { if self.epoch != 0 || self.estimated.is_some() { return Err(ErrorKind::InputNotAccepted.into()); } if self.netinfo.num_nodes() == 1 { - let mut fault_log = self.send_bval(input)?; - self.send_aux(input)?.merge_into(&mut fault_log); - self.decide(input); - Ok(fault_log) + let mut step = self.send_bval(input)?; + step.extend(self.send_aux(input)?); + step.extend(self.decide(input)); + Ok(step) } else { // Set the initial estimated value to the input value. self.estimated = Some(input); @@ -323,7 +301,7 @@ impl Agreement { self.epoch == 0 && self.estimated.is_none() } - fn handle_bval(&mut self, sender_id: &NodeUid, b: bool) -> Result> { + fn handle_bval(&mut self, sender_id: &NodeUid, b: bool) -> Result> { self.received_bval .entry(sender_id.clone()) .or_insert_with(BTreeSet::new) @@ -348,27 +326,27 @@ impl Agreement { } else if bin_values_changed { self.on_bin_values_changed() } else { - Ok(FaultLog::new()) + Ok(Step::default()) } } else if count_bval == self.netinfo.num_faulty() + 1 && !self.sent_bval.contains(&b) { // upon receiving `BVal(b)` messages from f + 1 nodes, if // `BVal(b)` has not been sent, multicast `BVal(b)` self.send_bval(b) } else { - Ok(FaultLog::new()) + Ok(Step::default()) } } /// Called when `bin_values` changes as a result of receiving a `BVal` message. Tries to update /// the epoch. - fn on_bin_values_changed(&mut self) -> Result> { + fn on_bin_values_changed(&mut self) -> Result> { match self.coin_schedule { CoinSchedule::False => { let (aux_count, aux_vals) = self.count_aux(); if aux_count >= self.netinfo.num_nodes() - self.netinfo.num_faulty() { self.on_coin(false, aux_vals.definite()) } else { - Ok(FaultLog::new()) + Ok(Step::default()) } } CoinSchedule::True => { @@ -376,7 +354,7 @@ impl Agreement { if aux_count >= self.netinfo.num_nodes() - self.netinfo.num_faulty() { self.on_coin(true, aux_vals.definite()) } else { - Ok(FaultLog::new()) + Ok(Step::default()) } } CoinSchedule::Random => { @@ -387,40 +365,42 @@ impl Agreement { } } - fn send_bval(&mut self, b: bool) -> Result> { + fn send_bval(&mut self, b: bool) -> Result> { if !self.netinfo.is_validator() { - return Ok(FaultLog::new()); + return Ok(Step::default()); } // Record the value `b` as sent. self.sent_bval.insert(b); // Multicast `BVal`. - self.messages - .push_back(AgreementContent::BVal(b).with_epoch(self.epoch)); + let msg = AgreementContent::BVal(b).with_epoch(self.epoch); + let mut step: Step = Target::All.message(msg).into(); // Receive the `BVal` message locally. let our_uid = &self.netinfo.our_uid().clone(); - self.handle_bval(our_uid, b) + step.extend(self.handle_bval(our_uid, b)?); + Ok(step) } - fn send_conf(&mut self) -> Result> { + fn send_conf(&mut self) -> Result> { if self.conf_round { // Only one `Conf` message is allowed in an epoch. - return Ok(FaultLog::new()); + return Ok(Step::default()); } // Trigger the start of the `Conf` round. self.conf_round = true; if !self.netinfo.is_validator() { - return Ok(FaultLog::new()); + return Ok(Step::default()); } let v = self.bin_values; // Multicast `Conf`. - self.messages - .push_back(AgreementContent::Conf(v).with_epoch(self.epoch)); + let msg = AgreementContent::Conf(v).with_epoch(self.epoch); + let mut step: Step = Target::All.message(msg).into(); // Receive the `Conf` message locally. let our_uid = &self.netinfo.our_uid().clone(); - self.handle_conf(our_uid, v) + step.extend(self.handle_conf(our_uid, v)?); + Ok(step) } /// Waits until at least (N − f) `Aux` messages have been received, such that @@ -428,33 +408,30 @@ impl Agreement { /// bin_values (note that bin_values_r may continue to change as `BVal` /// messages are received, thus this condition may be triggered upon arrival /// of either an `Aux` or a `BVal` message). - fn handle_aux(&mut self, sender_id: &NodeUid, b: bool) -> Result> { + fn handle_aux(&mut self, sender_id: &NodeUid, b: bool) -> Result> { // Perform the `Aux` message round only if a `Conf` round hasn't started yet. if self.conf_round { - return Ok(FaultLog::new()); + return Ok(Step::default()); } self.received_aux.insert(sender_id.clone(), b); if self.bin_values == BinValues::None { - return Ok(FaultLog::new()); + return Ok(Step::default()); } let (aux_count, aux_vals) = self.count_aux(); if aux_count < self.netinfo.num_nodes() - self.netinfo.num_faulty() { // Continue waiting for the (N - f) `Aux` messages. - return Ok(FaultLog::new()); + return Ok(Step::default()); } // Execute the Common Coin schedule `false, true, get_coin(), false, true, get_coin(), ...` match self.coin_schedule { CoinSchedule::False => self.on_coin(false, aux_vals.definite()), CoinSchedule::True => self.on_coin(true, aux_vals.definite()), - CoinSchedule::Random => { - // Start the `Conf` message round. - self.send_conf() - } + CoinSchedule::Random => self.send_conf(), // Start the `Conf` message round. } } - fn handle_conf(&mut self, sender_id: &NodeUid, v: BinValues) -> Result> { + fn handle_conf(&mut self, sender_id: &NodeUid, v: BinValues) -> Result> { self.received_conf.insert(sender_id.clone(), v); self.try_finish_conf_round() } @@ -462,16 +439,17 @@ impl Agreement { /// Receives a `Term(v)` message. If we haven't yet decided on a value and there are more than /// `num_faulty` such messages with the same value from different nodes, performs expedite /// termination: decides on `v`, broadcasts `Term(v)` and terminates the instance. - fn handle_term(&mut self, sender_id: &NodeUid, b: bool) -> Result> { + fn handle_term(&mut self, sender_id: &NodeUid, b: bool) -> Step { self.received_term.insert(sender_id.clone(), b); // Check for the expedite termination condition. if self.decision.is_none() && self.received_term.iter().filter(|(_, &c)| b == c).count() > self.netinfo.num_faulty() { - self.decide(b); + self.decide(b) + } else { + Step::default() } - Ok(FaultLog::new()) } /// Handles a Common Coin message. If there is output from Common Coin, starts the next @@ -480,7 +458,7 @@ impl Agreement { &mut self, sender_id: &NodeUid, msg: CommonCoinMessage, - ) -> Result> { + ) -> Result> { let coin_step = self.common_coin.handle_message(sender_id, msg)?; self.on_coin_step(coin_step) } @@ -488,38 +466,33 @@ impl Agreement { fn on_coin_step( &mut self, coin_step: common_coin::Step, - ) -> Result> { - let common_coin::Step { - output, - mut fault_log, - messages, - } = coin_step; + ) -> Result> { + let mut step = Step::default(); let epoch = self.epoch; - self.messages.extend(messages.into_iter().map( - |msg: TargetedMessage| { - AgreementContent::Coin(Box::new(msg.message)).with_epoch(epoch) - }, - )); - if let Some(coin) = output.into_iter().next() { + let coin_output = step.extend_with(coin_step, |c_msg| { + AgreementContent::Coin(Box::new(c_msg)).with_epoch(epoch) + }); + if let Some(coin) = coin_output.into_iter().next() { let def_bin_value = self.count_conf().1.definite(); - fault_log.extend(self.on_coin(coin, def_bin_value)?); + step.extend(self.on_coin(coin, def_bin_value)?); } - Ok(fault_log) + Ok(step) } /// When the common coin has been computed, tries to decide on an output value, updates the /// `Agreement` epoch and handles queued messages for the new epoch. - fn on_coin(&mut self, coin: bool, def_bin_value: Option) -> Result> { - let mut fault_log = FaultLog::new(); + fn on_coin(&mut self, coin: bool, def_bin_value: Option) -> Result> { if self.terminated { // Avoid an infinite regression without making an Agreement step. - return Ok(fault_log); + return Ok(Step::default()); } + let mut step = Step::default(); + let b = if let Some(b) = def_bin_value { // Outputting a value is allowed only once. if self.decision.is_none() && b == coin { - self.decide(b); + step.extend(self.decide(b)); } b } else { @@ -529,18 +502,15 @@ impl Agreement { self.update_epoch(); self.estimated = Some(b); - fault_log.extend(self.send_bval(b)?); + step.extend(self.send_bval(b)?); let queued_msgs = replace(&mut self.incoming_queue, Vec::new()); for (sender_id, msg) in queued_msgs { - let step = self.handle_message(&sender_id, msg)?; - fault_log.extend(step.fault_log); - // Save the output of the internal call. - self.output = step.output.into_iter().next(); + step.extend(self.handle_message(&sender_id, msg)?); if self.terminated { break; } } - Ok(fault_log) + Ok(step) } /// Computes the coin schedule for the current `Agreement` epoch. @@ -553,12 +523,13 @@ impl Agreement { } /// Decides on a value and broadcasts a `Term` message with that value. - fn decide(&mut self, b: bool) { + fn decide(&mut self, b: bool) -> Step { if self.terminated { - return; + return Step::default(); } // Output the agreement value. - self.output = Some(b); + let mut step = Step::default(); + step.output.push_back(b); // Latch the decided state. self.decision = Some(b); debug!( @@ -569,14 +540,15 @@ impl Agreement { b ); if self.netinfo.is_validator() { - self.messages - .push_back(AgreementContent::Term(b).with_epoch(self.epoch)); + let msg = AgreementContent::Term(b).with_epoch(self.epoch); + step.messages.push_back(Target::All.message(msg)); self.received_term.insert(self.netinfo.our_uid().clone(), b); } self.terminated = true; + step } - fn try_finish_conf_round(&mut self) -> Result> { + fn try_finish_conf_round(&mut self) -> Result> { if self.conf_round && self.count_conf().0 >= self.netinfo.num_nodes() - self.netinfo.num_faulty() { @@ -585,20 +557,22 @@ impl Agreement { self.on_coin_step(coin_step) } else { // Continue waiting for (N - f) `Conf` messages - Ok(FaultLog::new()) + Ok(Step::default()) } } - fn send_aux(&mut self, b: bool) -> Result> { + fn send_aux(&mut self, b: bool) -> Result> { if !self.netinfo.is_validator() { - return Ok(FaultLog::new()); + return Ok(Step::default()); } // Multicast `Aux`. - self.messages - .push_back(AgreementContent::Aux(b).with_epoch(self.epoch)); + let mut step: Step = Target::All + .message(AgreementContent::Aux(b).with_epoch(self.epoch)) + .into(); // Receive the `Aux` message locally. let our_uid = &self.netinfo.our_uid().clone(); - self.handle_aux(our_uid, b) + step.extend(self.handle_aux(our_uid, b)?); + Ok(step) } /// The count of `Aux` messages such that the set of values carried by those messages is a