diff --git a/src/agreement/bin_values.rs b/src/agreement/bin_values.rs index 11481d9..ac2b931 100644 --- a/src/agreement/bin_values.rs +++ b/src/agreement/bin_values.rs @@ -20,7 +20,7 @@ impl BinValues { replace(self, BinValues::None); } - fn from_bool(b: bool) -> Self { + pub fn from_bool(b: bool) -> Self { if b { BinValues::True } else { diff --git a/src/agreement/mod.rs b/src/agreement/mod.rs index 55c9f50..dc2e103 100644 --- a/src/agreement/mod.rs +++ b/src/agreement/mod.rs @@ -64,6 +64,14 @@ pub struct AgreementMessage { pub content: AgreementContent, } +/// Possible values of the common coin schedule defining the method to derive the common coin in a +/// given epoch: as a constant value or a distributed computation. +enum CoinSchedule { + False, + True, + Random, +} + /// Binary Agreement instance pub struct Agreement { /// Shared network information. @@ -113,6 +121,8 @@ pub struct Agreement { conf_round: bool, /// A common coin instance. It is reset on epoch update. common_coin: CommonCoin, + /// Common coin schedule computed at the start of each epoch. + coin_schedule: CoinSchedule, } impl DistAlgorithm for Agreement { @@ -201,6 +211,7 @@ impl Agreement { netinfo, Nonce::new(invocation_id.as_ref(), session_id, proposer_i, 0), ), + coin_schedule: CoinSchedule::True, }) } else { Err(ErrorKind::UnknownProposer.into()) @@ -252,9 +263,7 @@ impl Agreement { // Send an `Aux` message at most once per epoch. self.send_aux(b) } else if bin_values_changed { - // If the `Conf` round has already started, a change in `bin_values` can lead to its - // end. Try if it has indeed finished. - self.try_finish_conf_round() + self.on_bin_values_changed() } else { Ok(()) } @@ -267,6 +276,34 @@ impl Agreement { } } + /// Called when `bin_values` changes as a result of receiving a `BVal` message. Tries to update + /// the epoch. + fn on_bin_values_changed(&mut self) -> AgreementResult<()> { + match self.coin_schedule { + CoinSchedule::False => { + let (aux_count, aux_vals) = self.count_aux(); + if aux_count >= self.netinfo.num_nodes() - self.netinfo.num_faulty() { + self.on_coin(false, aux_vals.definite()) + } else { + Ok(()) + } + } + CoinSchedule::True => { + let (aux_count, aux_vals) = self.count_aux(); + if aux_count >= self.netinfo.num_nodes() - self.netinfo.num_faulty() { + self.on_coin(true, aux_vals.definite()) + } else { + Ok(()) + } + } + CoinSchedule::Random => { + // If the `Conf` round has already started, a change in `bin_values` can lead to its + // end. Try if it has indeed finished. + self.try_finish_conf_round() + } + } + } + fn send_bval(&mut self, b: bool) -> AgreementResult<()> { // Record the value `b` as sent. self.sent_bval.insert(b); @@ -309,12 +346,21 @@ impl Agreement { if self.bin_values == BinValues::None { return Ok(()); } - if self.count_aux() < self.netinfo.num_nodes() - self.netinfo.num_faulty() { + let (aux_count, aux_vals) = self.count_aux(); + if aux_count < self.netinfo.num_nodes() - self.netinfo.num_faulty() { // Continue waiting for the (N - f) `Aux` messages. return Ok(()); } - // Start the `Conf` message round. - self.send_conf() + + // Execute the Common Coin schedule `false, true, get_coin(), false, true, get_coin(), ...` + match self.coin_schedule { + CoinSchedule::False => self.on_coin(false, aux_vals.definite()), + CoinSchedule::True => self.on_coin(true, aux_vals.definite()), + CoinSchedule::Random => { + // Start the `Conf` message round. + self.send_conf() + } + } } fn handle_conf(&mut self, sender_id: &NodeUid, v: BinValues) -> AgreementResult<()> { @@ -344,29 +390,47 @@ impl Agreement { self.extend_common_coin(); if let Some(coin) = self.common_coin.next_output() { - let b = if let Some(b) = self.count_conf().1.definite() { - // Outputting a value is allowed only once. - if self.decision.is_none() && b == coin { - self.decide(b); - } - b - } else { - coin - }; + let def_bin_value = self.count_conf().1.definite(); + self.on_coin(coin, def_bin_value)?; + } - self.start_next_epoch(); + Ok(()) + } - self.estimated = Some(b); - self.send_bval(b)?; - let queued_msgs = replace(&mut self.incoming_queue, Vec::new()); - for (sender_id, msg) in queued_msgs { - self.handle_message(&sender_id, msg)?; + /// When the common coin has been computed, tries to decide on an output value, updates the + /// `Agreement` epoch and handles queued messages for the new epoch. + fn on_coin(&mut self, coin: bool, def_bin_value: Option) -> AgreementResult<()> { + let b = if let Some(b) = def_bin_value { + // Outputting a value is allowed only once. + if self.decision.is_none() && b == coin { + self.decide(b); } + b + } else { + coin + }; + + self.update_epoch(); + + self.estimated = Some(b); + self.send_bval(b)?; + let queued_msgs = replace(&mut self.incoming_queue, Vec::new()); + for (sender_id, msg) in queued_msgs { + self.handle_message(&sender_id, msg)?; } Ok(()) } - // Propagates Common Coin messages to the top level. + /// Computes the coin schedule for the current `Agreement` epoch. + fn coin_schedule(&self) -> CoinSchedule { + match self.epoch % 3 { + 0 => CoinSchedule::True, + 1 => CoinSchedule::False, + _ => CoinSchedule::Random, + } + } + + /// Propagates Common Coin messages to the top level. fn extend_common_coin(&mut self) { let epoch = self.epoch; self.messages.extend(self.common_coin.message_iter().map( @@ -423,7 +487,7 @@ impl Agreement { /// In general, we can't expect every good node to send the same `Aux` value, so waiting for N - /// f agreeing messages would not always terminate. We can, however, expect every good node to /// send an `Aux` value that will eventually end up in our `bin_values`. - fn count_aux(&self) -> usize { + fn count_aux(&self) -> (usize, BinValues) { let mut aux: BTreeMap<_, _> = self .received_aux .iter() @@ -438,7 +502,8 @@ impl Agreement { // Ensure that nodes are not counted twice. aux.extend(term); - aux.len() + let bin: BinValues = aux.values().map(|&&v| BinValues::from_bool(v)).collect(); + (aux.len(), bin) } /// Counts the number of received `Conf` messages. @@ -452,7 +517,7 @@ impl Agreement { (vals_cnt.count(), vals.cloned().collect()) } - fn start_next_epoch(&mut self) { + fn update_epoch(&mut self) { self.bin_values.clear(); self.received_bval.clear(); self.sent_bval.clear(); @@ -466,7 +531,10 @@ impl Agreement { *self.netinfo.node_index(&self.proposer_id).unwrap(), self.epoch, ); + // TODO: Don't spend time creating a `CommonCoin` instance in epochs where the common coin + // is known. self.common_coin = CommonCoin::new(self.netinfo.clone(), nonce); + self.coin_schedule = self.coin_schedule(); debug!( "{:?} Agreement instance {:?} started epoch {}", self.netinfo.our_uid(),