From 7824b7a0eaaf5cc73445d66b03a50245e63f8d70 Mon Sep 17 00:00:00 2001 From: c0gent Date: Thu, 2 Aug 2018 11:11:32 -0700 Subject: [PATCH] Reorganize `dynamic_honey_badger` and `agreement` modules slightly. * Move `CoinState` and `Agreement` definitions from `agreement/mod.rs` to `.../agreement.rs`. * Move `DynamicHoneyBadger` definition from `dynamic_honey_badger/mod.rs` to `.../dynamic_honey_badger.rs`. --- examples/network/connection.rs | 3 +- examples/network/node.rs | 3 +- examples/simulation.rs | 6 +- src/agreement/agreement.rs | 422 ++++++++++++++++ src/agreement/mod.rs | 428 +--------------- src/broadcast.rs | 3 +- src/common_subset.rs | 3 +- .../dynamic_honey_badger.rs | 419 ++++++++++++++++ src/dynamic_honey_badger/mod.rs | 460 ++---------------- src/dynamic_honey_badger/votes.rs | 13 +- src/honey_badger/honey_badger.rs | 3 +- tests/dynamic_honey_badger.rs | 3 +- tests/honey_badger.rs | 3 +- tests/sync_key_gen.rs | 6 +- 14 files changed, 895 insertions(+), 880 deletions(-) create mode 100644 src/agreement/agreement.rs create mode 100644 src/dynamic_honey_badger/dynamic_honey_badger.rs diff --git a/examples/network/connection.rs b/examples/network/connection.rs index 200aa5b..5353f8a 100644 --- a/examples/network/connection.rs +++ b/examples/network/connection.rs @@ -41,7 +41,6 @@ pub fn make( TcpStream::connect(address).expect("failed to connect") }; Connection::new(tcp_conn, there_str.to_string()) - }) - .collect(); + }).collect(); (here_str, connections) } diff --git a/examples/network/node.rs b/examples/network/node.rs index 0e151fb..3161f02 100644 --- a/examples/network/node.rs +++ b/examples/network/node.rs @@ -208,8 +208,7 @@ impl + PartialEq + Send + Sync + From> + .send(()) .map_err(|e| { error!("{}", e); - }) - .unwrap(); + }).unwrap(); process::exit(0); }) // end of thread scope diff --git a/examples/simulation.rs b/examples/simulation.rs index 6397fec..4998259 100644 --- a/examples/simulation.rs +++ b/examples/simulation.rs @@ -156,8 +156,7 @@ where target: msg.target, message: ser_msg, } - }) - .collect(); + }).collect(); let outputs = step .output .into_iter() @@ -204,8 +203,7 @@ where .map(|msg| { let ser_msg = bincode::serialize(&msg.message).expect("serialize"); (msg.target, ser_msg) - }) - .collect(); + }).collect(); self.time += start.elapsed() * self.hw_quality.cpu_factor / 100; let time = self.time; self.outputs diff --git a/src/agreement/agreement.rs b/src/agreement/agreement.rs new file mode 100644 index 0000000..186ed81 --- /dev/null +++ b/src/agreement/agreement.rs @@ -0,0 +1,422 @@ +use std::collections::BTreeMap; +use std::fmt::Debug; +use std::sync::Arc; + +use itertools::Itertools; + +use super::bool_multimap::BoolMultimap; +use super::sbv_broadcast::{self, SbvBroadcast}; +use super::{AgreementContent, AgreementMessage, Error, Nonce, Result, Step}; +use agreement::bool_set::BoolSet; +use common_coin::{self, CommonCoin, CommonCoinMessage}; +use messaging::{DistAlgorithm, NetworkInfo, Target}; + +/// The state of the current epoch's common coin. In some epochs this is fixed, in others it starts +/// with in `InProgress`. +#[derive(Debug)] +enum CoinState { + /// The value was fixed in the current epoch, or the coin has already terminated. + Decided(bool), + /// The coin value is not known yet. + InProgress(CommonCoin), +} + +impl CoinState { + /// Returns the value, if this coin has already decided. + fn value(&self) -> Option { + match self { + CoinState::Decided(value) => Some(*value), + CoinState::InProgress(_) => None, + } + } +} + +impl From for CoinState { + fn from(value: bool) -> Self { + CoinState::Decided(value) + } +} + +/// Binary Agreement instance +#[derive(Debug)] +pub struct Agreement { + /// Shared network information. + netinfo: Arc>, + /// Session ID, e.g, the Honey Badger algorithm epoch. + session_id: u64, + /// The ID of the proposer of the value for this agreement instance. + proposer_id: NodeUid, + /// Agreement algorithm epoch. + epoch: u32, + /// This epoch's Synchronized Binary Value Broadcast instance. + sbv_broadcast: SbvBroadcast, + /// Received `Conf` messages. Reset on every epoch update. + received_conf: BTreeMap, + /// Received `Term` messages. Kept throughout epoch updates. These count as `BVal`, `Aux` and + /// `Conf` messages for all future epochs. + received_term: BoolMultimap, + /// The estimate of the decision value in the current epoch. + estimated: Option, + /// A permanent, latching copy of the output value. This copy is required because `output` can + /// be consumed using `DistAlgorithm::next_output` immediately after the instance finishing to + /// handle a message, in which case it would otherwise be unknown whether the output value was + /// ever there at all. While the output value will still be required in a later epoch to decide + /// the termination state. + decision: Option, + /// A cache for messages for future epochs that cannot be handled yet. + // TODO: Find a better solution for this; defend against spam. + incoming_queue: BTreeMap>, + /// The values we found in the first _N - f_ `Aux` messages that were in `bin_values`. + conf_values: Option, + /// The state of this epoch's common coin. + coin_state: CoinState, +} + +impl DistAlgorithm for Agreement { + type NodeUid = NodeUid; + type Input = bool; + type Output = bool; + type Message = AgreementMessage; + type Error = Error; + + fn input(&mut self, input: Self::Input) -> Result> { + self.set_input(input) + } + + /// Receive input from a remote node. + fn handle_message( + &mut self, + sender_id: &Self::NodeUid, + AgreementMessage { epoch, content }: Self::Message, + ) -> Result> { + if self.decision.is_some() || (epoch < self.epoch && content.can_expire()) { + // Message is obsolete: We are already in a later epoch or terminated. + Ok(Step::default()) + } else if epoch > self.epoch { + // Message is for a later epoch. We can't handle that yet. + let queue = self.incoming_queue.entry(epoch).or_insert_with(Vec::new); + queue.push((sender_id.clone(), content)); + Ok(Step::default()) + } else { + self.handle_message_content(sender_id, content) + } + } + + /// Whether the algorithm has terminated. + fn terminated(&self) -> bool { + self.decision.is_some() + } + + fn our_id(&self) -> &Self::NodeUid { + self.netinfo.our_uid() + } +} + +impl Agreement { + pub fn new( + netinfo: Arc>, + session_id: u64, + proposer_id: NodeUid, + ) -> Result { + if !netinfo.is_node_validator(&proposer_id) { + return Err(Error::UnknownProposer); + } + Ok(Agreement { + netinfo: netinfo.clone(), + session_id, + proposer_id, + epoch: 0, + sbv_broadcast: SbvBroadcast::new(netinfo), + received_conf: BTreeMap::new(), + received_term: BoolMultimap::default(), + estimated: None, + decision: None, + incoming_queue: BTreeMap::new(), + conf_values: None, + coin_state: CoinState::Decided(true), + }) + } + + /// Sets the input value for agreement. + fn set_input(&mut self, input: bool) -> Result> { + if self.epoch != 0 || self.estimated.is_some() { + return Err(Error::InputNotAccepted); + } + // Set the initial estimated value to the input value. + self.estimated = Some(input); + debug!("{:?}/{:?} Input {}", self.our_id(), self.proposer_id, input); + let sbvb_step = self.sbv_broadcast.input(input)?; + self.handle_sbvb_step(sbvb_step) + } + + /// Acceptance check to be performed before setting the input value. + pub fn accepts_input(&self) -> bool { + self.epoch == 0 && self.estimated.is_none() + } + + /// Dispatches the message content to the corresponding handling method. + fn handle_message_content( + &mut self, + sender_id: &NodeUid, + content: AgreementContent, + ) -> Result> { + match content { + AgreementContent::SbvBroadcast(msg) => self.handle_sbv_broadcast(sender_id, msg), + AgreementContent::Conf(v) => self.handle_conf(sender_id, v), + AgreementContent::Term(v) => self.handle_term(sender_id, v), + AgreementContent::Coin(msg) => self.handle_coin(sender_id, *msg), + } + } + + /// Handles a Synchroniced Binary Value Broadcast message. + fn handle_sbv_broadcast( + &mut self, + sender_id: &NodeUid, + msg: sbv_broadcast::Message, + ) -> Result> { + let sbvb_step = self.sbv_broadcast.handle_message(sender_id, msg)?; + self.handle_sbvb_step(sbvb_step) + } + + /// Handles a Synchronized Binary Value Broadcast step. On output, starts the `Conf` round or + /// decides. + fn handle_sbvb_step( + &mut self, + sbvb_step: sbv_broadcast::Step, + ) -> Result> { + let mut step = Step::default(); + let output = step.extend_with(sbvb_step, |msg| { + AgreementContent::SbvBroadcast(msg).with_epoch(self.epoch) + }); + if self.conf_values.is_some() { + return Ok(step); // The `Conf` round has already started. + } + if let Some(aux_vals) = output.into_iter().next() { + // Execute the Common Coin schedule `false, true, get_coin(), false, true, get_coin(), ...` + match self.coin_state { + CoinState::Decided(_) => { + self.conf_values = Some(aux_vals); + step.extend(self.try_update_epoch()?) + } + CoinState::InProgress(_) => { + // Start the `Conf` message round. + step.extend(self.send_conf(aux_vals)?) + } + } + } + Ok(step) + } + + /// Handles a `Conf` message. When _N - f_ `Conf` messages with values in `bin_values` have + /// been received, updates the epoch or decides. + fn handle_conf(&mut self, sender_id: &NodeUid, v: BoolSet) -> Result> { + self.received_conf.insert(sender_id.clone(), v); + self.try_finish_conf_round() + } + + /// Handles a `Term(v)` message. If we haven't yet decided on a value and there are more than + /// _f_ such messages with the same value from different nodes, performs expedite termination: + /// decides on `v`, broadcasts `Term(v)` and terminates the instance. + fn handle_term(&mut self, sender_id: &NodeUid, b: bool) -> Result> { + self.received_term[b].insert(sender_id.clone()); + // Check for the expedite termination condition. + if self.decision.is_some() { + Ok(Step::default()) + } else if self.received_term[b].len() > self.netinfo.num_faulty() { + Ok(self.decide(b)) + } else { + // Otherwise handle the `Term` as a `BVal`, `Aux` and `Conf`. + let mut sbvb_step = self.sbv_broadcast.handle_bval(sender_id, b)?; + sbvb_step.extend(self.sbv_broadcast.handle_aux(sender_id, b)?); + let mut step = self.handle_sbvb_step(sbvb_step)?; + step.extend(self.handle_conf(sender_id, BoolSet::from(b))?); + Ok(step) + } + } + + /// Handles a Common Coin message. If there is output from Common Coin, starts the next + /// epoch. The function may output a decision value. + fn handle_coin( + &mut self, + sender_id: &NodeUid, + msg: CommonCoinMessage, + ) -> Result> { + let coin_step = match self.coin_state { + CoinState::Decided(_) => return Ok(Step::default()), // Coin value is already decided. + CoinState::InProgress(ref mut common_coin) => common_coin + .handle_message(sender_id, msg) + .map_err(Error::HandleCoinCommonCoin)?, + }; + self.on_coin_step(coin_step) + } + + /// Multicasts a `Conf(values)` message, and handles it. + fn send_conf(&mut self, values: BoolSet) -> Result> { + if self.conf_values.is_some() { + // Only one `Conf` message is allowed in an epoch. + return Ok(Step::default()); + } + + // Trigger the start of the `Conf` round. + self.conf_values = Some(values); + + if !self.netinfo.is_validator() { + return Ok(self.try_finish_conf_round()?); + } + + self.send(AgreementContent::Conf(values)) + } + + /// Multicasts and handles a message. Does nothing if we are only an observer. + fn send(&mut self, content: AgreementContent) -> Result> { + if !self.netinfo.is_validator() { + return Ok(Step::default()); + } + let mut step: Step<_> = Target::All + .message(content.clone().with_epoch(self.epoch)) + .into(); + let our_uid = &self.netinfo.our_uid().clone(); + step.extend(self.handle_message_content(our_uid, content)?); + Ok(step) + } + + /// Handles a step returned from the `CommonCoin`. + fn on_coin_step( + &mut self, + coin_step: common_coin::Step, + ) -> Result> { + let mut step = Step::default(); + let epoch = self.epoch; + let to_msg = |c_msg| AgreementContent::Coin(Box::new(c_msg)).with_epoch(epoch); + let coin_output = step.extend_with(coin_step, to_msg); + if let Some(coin) = coin_output.into_iter().next() { + self.coin_state = coin.into(); + step.extend(self.try_update_epoch()?); + } + Ok(step) + } + + /// If this epoch's coin value or conf values are not known yet, does nothing, otherwise + /// updates the epoch or decides. + /// + /// With two conf values, the next epoch's estimate is the coin value. If there is only one conf + /// value and that disagrees with the coin, the conf value is the next epoch's estimate. If + /// the unique conf value agrees with the coin, terminates and decides on that value. + fn try_update_epoch(&mut self) -> Result> { + if self.decision.is_some() { + // Avoid an infinite regression without making an Agreement step. + return Ok(Step::default()); + } + let coin = match self.coin_state.value() { + None => return Ok(Step::default()), // Still waiting for coin value. + Some(coin) => coin, + }; + let def_bin_value = match self.conf_values { + None => return Ok(Step::default()), // Still waiting for conf value. + Some(ref values) => values.definite(), + }; + + if Some(coin) == def_bin_value { + Ok(self.decide(coin)) + } else { + self.update_epoch(def_bin_value.unwrap_or(coin)) + } + } + + /// Creates the initial coin state for the current epoch, i.e. sets it to the predetermined + /// value, or initializes a `CommonCoin` instance. + fn coin_state(&self) -> CoinState { + match self.epoch % 3 { + 0 => CoinState::Decided(true), + 1 => CoinState::Decided(false), + _ => { + let nonce = Nonce::new( + self.netinfo.invocation_id().as_ref(), + self.session_id, + self.netinfo.node_index(&self.proposer_id).unwrap(), + self.epoch, + ); + CoinState::InProgress(CommonCoin::new(self.netinfo.clone(), nonce)) + } + } + } + + /// Decides on a value and broadcasts a `Term` message with that value. + fn decide(&mut self, b: bool) -> Step { + if self.decision.is_some() { + return Step::default(); + } + // Output the agreement value. + let mut step = Step::default(); + step.output.push_back(b); + // Latch the decided state. + self.decision = Some(b); + debug!( + "{:?}/{:?} (is_validator: {}) decision: {}", + self.netinfo.our_uid(), + self.proposer_id, + self.netinfo.is_validator(), + b + ); + if self.netinfo.is_validator() { + let msg = AgreementContent::Term(b).with_epoch(self.epoch + 1); + step.messages.push_back(Target::All.message(msg)); + } + step + } + + /// Checks whether the _N - f_ `Conf` messages have arrived, and if so, activates the coin. + fn try_finish_conf_round(&mut self) -> Result> { + if self.conf_values.is_none() || self.count_conf() < self.netinfo.num_correct() { + return Ok(Step::default()); + } + + // Invoke the common coin. + let coin_step = match self.coin_state { + CoinState::Decided(_) => return Ok(Step::default()), // Coin has already decided. + CoinState::InProgress(ref mut common_coin) => common_coin + .input(()) + .map_err(Error::TryFinishConfRoundCommonCoin)?, + }; + let mut step = self.on_coin_step(coin_step)?; + step.extend(self.try_update_epoch()?); + Ok(step) + } + + /// Counts the number of received `Conf` messages with values in `bin_values`. + fn count_conf(&self) -> usize { + let is_bin_val = |conf: &&BoolSet| conf.is_subset(self.sbv_broadcast.bin_values()); + self.received_conf.values().filter(is_bin_val).count() + } + + /// Increments the epoch, sets the new estimate and handles queued messages. + fn update_epoch(&mut self, b: bool) -> Result> { + self.sbv_broadcast.clear(&self.received_term); + self.received_conf.clear(); + for (v, id) in &self.received_term { + self.received_conf.insert(id.clone(), BoolSet::from(v)); + } + self.conf_values = None; + self.epoch += 1; + self.coin_state = self.coin_state(); + debug!( + "{:?} Agreement instance {:?} started epoch {}, {} terminated", + self.netinfo.our_uid(), + self.proposer_id, + self.epoch, + self.received_conf.len(), + ); + + self.estimated = Some(b); + let sbvb_step = self.sbv_broadcast.input(b)?; + let mut step = self.handle_sbvb_step(sbvb_step)?; + let queued_msgs = Itertools::flatten(self.incoming_queue.remove(&self.epoch).into_iter()); + for (sender_id, content) in queued_msgs { + step.extend(self.handle_message_content(&sender_id, content)?); + if self.decision.is_some() { + break; + } + } + Ok(step) + } +} diff --git a/src/agreement/mod.rs b/src/agreement/mod.rs index 2226e28..a4f618a 100644 --- a/src/agreement/mod.rs +++ b/src/agreement/mod.rs @@ -63,22 +63,18 @@ //! * After _f + 1_ nodes have sent us their coin shares, we receive the coin output and assign it //! to `s`. +mod agreement; mod bool_multimap; pub mod bool_set; mod sbv_broadcast; use rand; -use std::collections::BTreeMap; -use std::fmt::Debug; -use std::sync::Arc; -use itertools::Itertools; +use self::bool_set::BoolSet; +use common_coin::{self, CommonCoinMessage}; +use messaging; -use self::bool_multimap::BoolMultimap; -use self::sbv_broadcast::SbvBroadcast; -use agreement::bool_set::BoolSet; -use common_coin::{self, CommonCoin, CommonCoinMessage}; -use messaging::{self, DistAlgorithm, NetworkInfo, Target}; +pub use self::agreement::Agreement; /// An agreement error. #[derive(Clone, Eq, PartialEq, Debug, Fail)] @@ -96,6 +92,8 @@ pub enum Error { /// An agreement result. pub type Result = ::std::result::Result; +pub type Step = messaging::Step>; + #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub enum AgreementContent { /// Synchronized Binary Value Broadcast message. @@ -150,418 +148,6 @@ impl rand::Rand for AgreementContent { } } -/// The state of the current epoch's common coin. In some epochs this is fixed, in others it starts -/// with in `InProgress`. -#[derive(Debug)] -enum CoinState { - /// The value was fixed in the current epoch, or the coin has already terminated. - Decided(bool), - /// The coin value is not known yet. - InProgress(CommonCoin), -} - -impl CoinState { - /// Returns the value, if this coin has already decided. - fn value(&self) -> Option { - match self { - CoinState::Decided(value) => Some(*value), - CoinState::InProgress(_) => None, - } - } -} - -impl From for CoinState { - fn from(value: bool) -> Self { - CoinState::Decided(value) - } -} - -/// Binary Agreement instance -#[derive(Debug)] -pub struct Agreement { - /// Shared network information. - netinfo: Arc>, - /// Session ID, e.g, the Honey Badger algorithm epoch. - session_id: u64, - /// The ID of the proposer of the value for this agreement instance. - proposer_id: NodeUid, - /// Agreement algorithm epoch. - epoch: u32, - /// This epoch's Synchronized Binary Value Broadcast instance. - sbv_broadcast: SbvBroadcast, - /// Received `Conf` messages. Reset on every epoch update. - received_conf: BTreeMap, - /// Received `Term` messages. Kept throughout epoch updates. These count as `BVal`, `Aux` and - /// `Conf` messages for all future epochs. - received_term: BoolMultimap, - /// The estimate of the decision value in the current epoch. - estimated: Option, - /// A permanent, latching copy of the output value. This copy is required because `output` can - /// be consumed using `DistAlgorithm::next_output` immediately after the instance finishing to - /// handle a message, in which case it would otherwise be unknown whether the output value was - /// ever there at all. While the output value will still be required in a later epoch to decide - /// the termination state. - decision: Option, - /// A cache for messages for future epochs that cannot be handled yet. - // TODO: Find a better solution for this; defend against spam. - incoming_queue: BTreeMap>, - /// The values we found in the first _N - f_ `Aux` messages that were in `bin_values`. - conf_values: Option, - /// The state of this epoch's common coin. - coin_state: CoinState, -} - -pub type Step = messaging::Step>; - -impl DistAlgorithm for Agreement { - type NodeUid = NodeUid; - type Input = bool; - type Output = bool; - type Message = AgreementMessage; - type Error = Error; - - fn input(&mut self, input: Self::Input) -> Result> { - self.set_input(input) - } - - /// Receive input from a remote node. - fn handle_message( - &mut self, - sender_id: &Self::NodeUid, - AgreementMessage { epoch, content }: Self::Message, - ) -> Result> { - if self.decision.is_some() || (epoch < self.epoch && content.can_expire()) { - // Message is obsolete: We are already in a later epoch or terminated. - Ok(Step::default()) - } else if epoch > self.epoch { - // Message is for a later epoch. We can't handle that yet. - let queue = self.incoming_queue.entry(epoch).or_insert_with(Vec::new); - queue.push((sender_id.clone(), content)); - Ok(Step::default()) - } else { - self.handle_message_content(sender_id, content) - } - } - - /// Whether the algorithm has terminated. - fn terminated(&self) -> bool { - self.decision.is_some() - } - - fn our_id(&self) -> &Self::NodeUid { - self.netinfo.our_uid() - } -} - -impl Agreement { - pub fn new( - netinfo: Arc>, - session_id: u64, - proposer_id: NodeUid, - ) -> Result { - if !netinfo.is_node_validator(&proposer_id) { - return Err(Error::UnknownProposer); - } - Ok(Agreement { - netinfo: netinfo.clone(), - session_id, - proposer_id, - epoch: 0, - sbv_broadcast: SbvBroadcast::new(netinfo), - received_conf: BTreeMap::new(), - received_term: BoolMultimap::default(), - estimated: None, - decision: None, - incoming_queue: BTreeMap::new(), - conf_values: None, - coin_state: CoinState::Decided(true), - }) - } - - /// Sets the input value for agreement. - fn set_input(&mut self, input: bool) -> Result> { - if self.epoch != 0 || self.estimated.is_some() { - return Err(Error::InputNotAccepted); - } - // Set the initial estimated value to the input value. - self.estimated = Some(input); - debug!("{:?}/{:?} Input {}", self.our_id(), self.proposer_id, input); - let sbvb_step = self.sbv_broadcast.input(input)?; - self.handle_sbvb_step(sbvb_step) - } - - /// Acceptance check to be performed before setting the input value. - pub fn accepts_input(&self) -> bool { - self.epoch == 0 && self.estimated.is_none() - } - - /// Dispatches the message content to the corresponding handling method. - fn handle_message_content( - &mut self, - sender_id: &NodeUid, - content: AgreementContent, - ) -> Result> { - match content { - AgreementContent::SbvBroadcast(msg) => self.handle_sbv_broadcast(sender_id, msg), - AgreementContent::Conf(v) => self.handle_conf(sender_id, v), - AgreementContent::Term(v) => self.handle_term(sender_id, v), - AgreementContent::Coin(msg) => self.handle_coin(sender_id, *msg), - } - } - - /// Handles a Synchroniced Binary Value Broadcast message. - fn handle_sbv_broadcast( - &mut self, - sender_id: &NodeUid, - msg: sbv_broadcast::Message, - ) -> Result> { - let sbvb_step = self.sbv_broadcast.handle_message(sender_id, msg)?; - self.handle_sbvb_step(sbvb_step) - } - - /// Handles a Synchronized Binary Value Broadcast step. On output, starts the `Conf` round or - /// decides. - fn handle_sbvb_step( - &mut self, - sbvb_step: sbv_broadcast::Step, - ) -> Result> { - let mut step = Step::default(); - let output = step.extend_with(sbvb_step, |msg| { - AgreementContent::SbvBroadcast(msg).with_epoch(self.epoch) - }); - if self.conf_values.is_some() { - return Ok(step); // The `Conf` round has already started. - } - if let Some(aux_vals) = output.into_iter().next() { - // Execute the Common Coin schedule `false, true, get_coin(), false, true, get_coin(), ...` - match self.coin_state { - CoinState::Decided(_) => { - self.conf_values = Some(aux_vals); - step.extend(self.try_update_epoch()?) - } - CoinState::InProgress(_) => { - // Start the `Conf` message round. - step.extend(self.send_conf(aux_vals)?) - } - } - } - Ok(step) - } - - /// Handles a `Conf` message. When _N - f_ `Conf` messages with values in `bin_values` have - /// been received, updates the epoch or decides. - fn handle_conf(&mut self, sender_id: &NodeUid, v: BoolSet) -> Result> { - self.received_conf.insert(sender_id.clone(), v); - self.try_finish_conf_round() - } - - /// Handles a `Term(v)` message. If we haven't yet decided on a value and there are more than - /// _f_ such messages with the same value from different nodes, performs expedite termination: - /// decides on `v`, broadcasts `Term(v)` and terminates the instance. - fn handle_term(&mut self, sender_id: &NodeUid, b: bool) -> Result> { - self.received_term[b].insert(sender_id.clone()); - // Check for the expedite termination condition. - if self.decision.is_some() { - Ok(Step::default()) - } else if self.received_term[b].len() > self.netinfo.num_faulty() { - Ok(self.decide(b)) - } else { - // Otherwise handle the `Term` as a `BVal`, `Aux` and `Conf`. - let mut sbvb_step = self.sbv_broadcast.handle_bval(sender_id, b)?; - sbvb_step.extend(self.sbv_broadcast.handle_aux(sender_id, b)?); - let mut step = self.handle_sbvb_step(sbvb_step)?; - step.extend(self.handle_conf(sender_id, BoolSet::from(b))?); - Ok(step) - } - } - - /// Handles a Common Coin message. If there is output from Common Coin, starts the next - /// epoch. The function may output a decision value. - fn handle_coin( - &mut self, - sender_id: &NodeUid, - msg: CommonCoinMessage, - ) -> Result> { - let coin_step = match self.coin_state { - CoinState::Decided(_) => return Ok(Step::default()), // Coin value is already decided. - CoinState::InProgress(ref mut common_coin) => common_coin - .handle_message(sender_id, msg) - .map_err(Error::HandleCoinCommonCoin)?, - }; - self.on_coin_step(coin_step) - } - - /// Multicasts a `Conf(values)` message, and handles it. - fn send_conf(&mut self, values: BoolSet) -> Result> { - if self.conf_values.is_some() { - // Only one `Conf` message is allowed in an epoch. - return Ok(Step::default()); - } - - // Trigger the start of the `Conf` round. - self.conf_values = Some(values); - - if !self.netinfo.is_validator() { - return Ok(self.try_finish_conf_round()?); - } - - self.send(AgreementContent::Conf(values)) - } - - /// Multicasts and handles a message. Does nothing if we are only an observer. - fn send(&mut self, content: AgreementContent) -> Result> { - if !self.netinfo.is_validator() { - return Ok(Step::default()); - } - let mut step: Step<_> = Target::All - .message(content.clone().with_epoch(self.epoch)) - .into(); - let our_uid = &self.netinfo.our_uid().clone(); - step.extend(self.handle_message_content(our_uid, content)?); - Ok(step) - } - - /// Handles a step returned from the `CommonCoin`. - fn on_coin_step( - &mut self, - coin_step: common_coin::Step, - ) -> Result> { - let mut step = Step::default(); - let epoch = self.epoch; - let to_msg = |c_msg| AgreementContent::Coin(Box::new(c_msg)).with_epoch(epoch); - let coin_output = step.extend_with(coin_step, to_msg); - if let Some(coin) = coin_output.into_iter().next() { - self.coin_state = coin.into(); - step.extend(self.try_update_epoch()?); - } - Ok(step) - } - - /// If this epoch's coin value or conf values are not known yet, does nothing, otherwise - /// updates the epoch or decides. - /// - /// With two conf values, the next epoch's estimate is the coin value. If there is only one conf - /// value and that disagrees with the coin, the conf value is the next epoch's estimate. If - /// the unique conf value agrees with the coin, terminates and decides on that value. - fn try_update_epoch(&mut self) -> Result> { - if self.decision.is_some() { - // Avoid an infinite regression without making an Agreement step. - return Ok(Step::default()); - } - let coin = match self.coin_state.value() { - None => return Ok(Step::default()), // Still waiting for coin value. - Some(coin) => coin, - }; - let def_bin_value = match self.conf_values { - None => return Ok(Step::default()), // Still waiting for conf value. - Some(ref values) => values.definite(), - }; - - if Some(coin) == def_bin_value { - Ok(self.decide(coin)) - } else { - self.update_epoch(def_bin_value.unwrap_or(coin)) - } - } - - /// Creates the initial coin state for the current epoch, i.e. sets it to the predetermined - /// value, or initializes a `CommonCoin` instance. - fn coin_state(&self) -> CoinState { - match self.epoch % 3 { - 0 => CoinState::Decided(true), - 1 => CoinState::Decided(false), - _ => { - let nonce = Nonce::new( - self.netinfo.invocation_id().as_ref(), - self.session_id, - self.netinfo.node_index(&self.proposer_id).unwrap(), - self.epoch, - ); - CoinState::InProgress(CommonCoin::new(self.netinfo.clone(), nonce)) - } - } - } - - /// Decides on a value and broadcasts a `Term` message with that value. - fn decide(&mut self, b: bool) -> Step { - if self.decision.is_some() { - return Step::default(); - } - // Output the agreement value. - let mut step = Step::default(); - step.output.push_back(b); - // Latch the decided state. - self.decision = Some(b); - debug!( - "{:?}/{:?} (is_validator: {}) decision: {}", - self.netinfo.our_uid(), - self.proposer_id, - self.netinfo.is_validator(), - b - ); - if self.netinfo.is_validator() { - let msg = AgreementContent::Term(b).with_epoch(self.epoch + 1); - step.messages.push_back(Target::All.message(msg)); - } - step - } - - /// Checks whether the _N - f_ `Conf` messages have arrived, and if so, activates the coin. - fn try_finish_conf_round(&mut self) -> Result> { - if self.conf_values.is_none() || self.count_conf() < self.netinfo.num_correct() { - return Ok(Step::default()); - } - - // Invoke the common coin. - let coin_step = match self.coin_state { - CoinState::Decided(_) => return Ok(Step::default()), // Coin has already decided. - CoinState::InProgress(ref mut common_coin) => common_coin - .input(()) - .map_err(Error::TryFinishConfRoundCommonCoin)?, - }; - let mut step = self.on_coin_step(coin_step)?; - step.extend(self.try_update_epoch()?); - Ok(step) - } - - /// Counts the number of received `Conf` messages with values in `bin_values`. - fn count_conf(&self) -> usize { - let is_bin_val = |conf: &&BoolSet| conf.is_subset(self.sbv_broadcast.bin_values()); - self.received_conf.values().filter(is_bin_val).count() - } - - /// Increments the epoch, sets the new estimate and handles queued messages. - fn update_epoch(&mut self, b: bool) -> Result> { - self.sbv_broadcast.clear(&self.received_term); - self.received_conf.clear(); - for (v, id) in &self.received_term { - self.received_conf.insert(id.clone(), BoolSet::from(v)); - } - self.conf_values = None; - self.epoch += 1; - self.coin_state = self.coin_state(); - debug!( - "{:?} Agreement instance {:?} started epoch {}, {} terminated", - self.netinfo.our_uid(), - self.proposer_id, - self.epoch, - self.received_conf.len(), - ); - - self.estimated = Some(b); - let sbvb_step = self.sbv_broadcast.input(b)?; - let mut step = self.handle_sbvb_step(sbvb_step)?; - let queued_msgs = Itertools::flatten(self.incoming_queue.remove(&self.epoch).into_iter()); - for (sender_id, content) in queued_msgs { - step.extend(self.handle_message_content(&sender_id, content)?); - if self.decision.is_some() { - break; - } - } - Ok(step) - } -} - #[derive(Clone, Debug)] struct Nonce(Vec); diff --git a/src/broadcast.rs b/src/broadcast.rs index 9026e7a..449aa0b 100644 --- a/src/broadcast.rs +++ b/src/broadcast.rs @@ -541,8 +541,7 @@ impl Broadcast { None } }) - }) - .collect(); + }).collect(); if let Some(value) = decode_from_shards(&mut leaf_values, &self.coding, self.data_shard_num, hash) { diff --git a/src/common_subset.rs b/src/common_subset.rs index 04738e5..80a3489 100644 --- a/src/common_subset.rs +++ b/src/common_subset.rs @@ -129,7 +129,8 @@ impl CommonSubset { for proposer_id in netinfo.all_uids() { broadcast_instances.insert( proposer_id.clone(), - Broadcast::new(netinfo.clone(), proposer_id.clone()).map_err(Error::NewBroadcast)?, + Broadcast::new(netinfo.clone(), proposer_id.clone()) + .map_err(Error::NewBroadcast)?, ); } diff --git a/src/dynamic_honey_badger/dynamic_honey_badger.rs b/src/dynamic_honey_badger/dynamic_honey_badger.rs new file mode 100644 index 0000000..dc29089 --- /dev/null +++ b/src/dynamic_honey_badger/dynamic_honey_badger.rs @@ -0,0 +1,419 @@ +use rand::Rand; +use std::fmt::Debug; +use std::hash::Hash; +use std::mem; +use std::sync::Arc; + +use bincode; +use crypto::Signature; +use serde::{Deserialize, Serialize}; + +use super::votes::{SignedVote, VoteCounter}; +use super::{ + Batch, Change, ChangeState, DynamicHoneyBadgerBuilder, Error, ErrorKind, Input, + InternalContrib, KeyGenMessage, KeyGenState, Message, Result, SignedKeyGenMsg, Step, +}; +use fault_log::{Fault, FaultKind, FaultLog}; +use honey_badger::{self, HoneyBadger, Message as HbMessage}; +use messaging::{DistAlgorithm, NetworkInfo, Target}; +use sync_key_gen::{Ack, Part, PartOutcome, SyncKeyGen}; + +/// A Honey Badger instance that can handle adding and removing nodes. +#[derive(Debug)] +pub struct DynamicHoneyBadger { + /// Shared network data. + pub(super) netinfo: NetworkInfo, + /// The maximum number of future epochs for which we handle messages simultaneously. + pub(super) max_future_epochs: usize, + /// The first epoch after the latest node change. + pub(super) start_epoch: u64, + /// The buffer and counter for the pending and committed change votes. + pub(super) vote_counter: VoteCounter, + /// Pending node transactions that we will propose in the next epoch. + pub(super) key_gen_msg_buffer: Vec>, + /// The `HoneyBadger` instance with the current set of nodes. + pub(super) honey_badger: HoneyBadger, NodeUid>, + /// The current key generation process, and the change it applies to. + pub(super) key_gen_state: Option>, + /// A queue for messages from future epochs that cannot be handled yet. + pub(super) incoming_queue: Vec<(NodeUid, Message)>, +} + +impl DistAlgorithm for DynamicHoneyBadger +where + C: Eq + Serialize + for<'r> Deserialize<'r> + Debug + Hash, + NodeUid: Eq + Ord + Clone + Serialize + for<'r> Deserialize<'r> + Debug + Hash + Rand, +{ + type NodeUid = NodeUid; + type Input = Input; + type Output = Batch; + type Message = Message; + type Error = Error; + + fn input(&mut self, input: Self::Input) -> Result> { + // User contributions are forwarded to `HoneyBadger` right away. Votes are signed and + // broadcast. + match input { + Input::User(contrib) => self.propose(contrib), + Input::Change(change) => self.vote_for(change), + } + } + + fn handle_message( + &mut self, + sender_id: &NodeUid, + message: Self::Message, + ) -> Result> { + let epoch = message.start_epoch(); + if epoch < self.start_epoch { + // Obsolete message. + Ok(Step::default()) + } else if epoch > self.start_epoch { + // Message cannot be handled yet. Save it for later. + let entry = (sender_id.clone(), message); + self.incoming_queue.push(entry); + Ok(Step::default()) + } else { + match message { + Message::HoneyBadger(_, hb_msg) => { + self.handle_honey_badger_message(sender_id, hb_msg) + } + Message::KeyGen(_, kg_msg, sig) => self + .handle_key_gen_message(sender_id, kg_msg, *sig) + .map(FaultLog::into), + Message::SignedVote(signed_vote) => self + .vote_counter + .add_pending_vote(sender_id, signed_vote) + .map(FaultLog::into), + } + } + } + + fn terminated(&self) -> bool { + false + } + + fn our_id(&self) -> &NodeUid { + self.netinfo.our_uid() + } +} + +impl DynamicHoneyBadger +where + C: Eq + Serialize + for<'r> Deserialize<'r> + Debug + Hash, + NodeUid: Eq + Ord + Clone + Debug + Serialize + for<'r> Deserialize<'r> + Hash + Rand, +{ + /// Returns a new `DynamicHoneyBadgerBuilder`. + pub fn builder() -> DynamicHoneyBadgerBuilder { + DynamicHoneyBadgerBuilder::new() + } + + /// Returns `true` if input for the current epoch has already been provided. + pub fn has_input(&self) -> bool { + self.honey_badger.has_input() + } + + /// Proposes a contribution in the current epoch. + pub fn propose(&mut self, contrib: C) -> Result> { + let step = self + .honey_badger + .input(InternalContrib { + contrib, + key_gen_messages: self.key_gen_msg_buffer.clone(), + votes: self.vote_counter.pending_votes().cloned().collect(), + }).map_err(ErrorKind::ProposeHoneyBadger)?; + self.process_output(step) + } + + /// Cast a vote to change the set of validators. + pub fn vote_for(&mut self, change: Change) -> Result> { + if !self.netinfo.is_validator() { + return Ok(Step::default()); // TODO: Return an error? + } + let signed_vote = self.vote_counter.sign_vote_for(change)?.clone(); + let msg = Message::SignedVote(signed_vote); + Ok(Target::All.message(msg).into()) + } + + /// Returns the information about the node IDs in the network, and the cryptographic keys. + pub fn netinfo(&self) -> &NetworkInfo { + &self.netinfo + } + + /// Returns `true` if we should make our contribution for the next epoch, even if we don't have + /// content ourselves, to avoid stalling the network. + /// + /// By proposing only if this returns `true`, you can prevent an adversary from making the + /// network output empty baches indefinitely, but it also means that the network won't advance + /// if fewer than _f + 1_ nodes have pending contributions. + pub fn should_propose(&self) -> bool { + if self.has_input() { + return false; // We have already proposed. + } + if self.honey_badger.received_proposals() > self.netinfo.num_faulty() { + return true; // At least one correct node wants to move on to the next epoch. + } + let is_our_vote = |signed_vote: &SignedVote<_>| signed_vote.voter() == self.our_id(); + if self.vote_counter.pending_votes().any(is_our_vote) { + return true; // We have pending input to vote for a validator change. + } + let kgs = match self.key_gen_state { + None => return false, // No ongoing key generation. + Some(ref kgs) => kgs, + }; + // If either we or the candidate have a pending key gen message, we should propose. + let ours_or_candidates = |msg: &SignedKeyGenMsg<_>| { + msg.1 == *self.our_id() || Some(&msg.1) == kgs.change.candidate() + }; + self.key_gen_msg_buffer.iter().any(ours_or_candidates) + } + + /// Handles a message for the `HoneyBadger` instance. + fn handle_honey_badger_message( + &mut self, + sender_id: &NodeUid, + message: HbMessage, + ) -> Result> { + if !self.netinfo.is_node_validator(sender_id) { + info!("Unknown sender {:?} of message {:?}", sender_id, message); + return Err(ErrorKind::UnknownSender.into()); + } + // Handle the message. + let step = self + .honey_badger + .handle_message(sender_id, message) + .map_err(ErrorKind::HandleHoneyBadgerMessageHoneyBadger)?; + self.process_output(step) + } + + /// Handles a vote or key generation message and tries to commit it as a transaction. These + /// messages are only handled once they appear in a batch output from Honey Badger. + fn handle_key_gen_message( + &mut self, + sender_id: &NodeUid, + kg_msg: KeyGenMessage, + sig: Signature, + ) -> Result> { + if !self.verify_signature(sender_id, &sig, &kg_msg)? { + info!("Invalid signature from {:?} for: {:?}.", sender_id, kg_msg); + let fault_kind = FaultKind::InvalidKeyGenMessageSignature; + return Ok(Fault::new(sender_id.clone(), fault_kind).into()); + } + let kgs = match self.key_gen_state { + Some(ref mut kgs) => kgs, + None => { + info!( + "Unexpected key gen message from {:?}: {:?}.", + sender_id, kg_msg + ); + return Ok(Fault::new(sender_id.clone(), FaultKind::UnexpectedKeyGenMessage).into()); + } + }; + + // If the joining node is correct, it will send at most (N + 1)² + 1 key generation + // messages. + if Some(sender_id) == kgs.change.candidate() { + let n = self.netinfo.num_nodes() + 1; + if kgs.candidate_msg_count > n * n { + info!( + "Too many key gen messages from candidate {:?}: {:?}.", + sender_id, kg_msg + ); + let fault_kind = FaultKind::TooManyCandidateKeyGenMessages; + return Ok(Fault::new(sender_id.clone(), fault_kind).into()); + } + kgs.candidate_msg_count += 1; + } + + let tx = SignedKeyGenMsg(self.start_epoch, sender_id.clone(), kg_msg, sig); + self.key_gen_msg_buffer.push(tx); + Ok(FaultLog::default()) + } + + /// Processes all pending batches output by Honey Badger. + fn process_output( + &mut self, + hb_step: honey_badger::Step, NodeUid>, + ) -> Result> { + let mut step: Step = Step::default(); + let start_epoch = self.start_epoch; + let output = step.extend_with(hb_step, |hb_msg| Message::HoneyBadger(start_epoch, hb_msg)); + for hb_batch in output { + // Create the batch we output ourselves. It will contain the _user_ transactions of + // `hb_batch`, and the current change state. + let mut batch = Batch::new(hb_batch.epoch + self.start_epoch); + + // Add the user transactions to `batch` and handle votes and DKG messages. + for (id, int_contrib) in hb_batch.contributions { + let InternalContrib { + votes, + key_gen_messages, + contrib, + } = int_contrib; + step.fault_log + .extend(self.vote_counter.add_committed_votes(&id, votes)?); + batch.contributions.insert(id.clone(), contrib); + self.key_gen_msg_buffer + .retain(|skgm| !key_gen_messages.contains(skgm)); + for SignedKeyGenMsg(epoch, s_id, kg_msg, sig) in key_gen_messages { + if epoch < self.start_epoch { + info!("Obsolete key generation message: {:?}.", kg_msg); + continue; + } + if !self.verify_signature(&s_id, &sig, &kg_msg)? { + info!( + "Invalid signature in {:?}'s batch from {:?} for: {:?}.", + id, s_id, kg_msg + ); + let fault_kind = FaultKind::InvalidKeyGenMessageSignature; + step.fault_log.append(id.clone(), fault_kind); + continue; + } + step.extend(match kg_msg { + KeyGenMessage::Part(part) => self.handle_part(&s_id, part)?, + KeyGenMessage::Ack(ack) => self.handle_ack(&s_id, ack)?.into(), + }); + } + } + + if let Some(kgs) = self.take_ready_key_gen() { + // If DKG completed, apply the change, restart Honey Badger, and inform the user. + debug!("{:?} DKG for {:?} complete!", self.our_id(), kgs.change); + self.netinfo = kgs.key_gen.into_network_info(); + self.restart_honey_badger(batch.epoch + 1); + batch.set_change(ChangeState::Complete(kgs.change), &self.netinfo); + } else if let Some(change) = self.vote_counter.compute_winner().cloned() { + // If there is a new change, restart DKG. Inform the user about the current change. + step.extend(self.update_key_gen(batch.epoch + 1, &change)?); + batch.set_change(ChangeState::InProgress(change), &self.netinfo); + } + step.output.push_back(batch); + } + // If `start_epoch` changed, we can now handle some queued messages. + if start_epoch < self.start_epoch { + let queue = mem::replace(&mut self.incoming_queue, Vec::new()); + for (sender_id, msg) in queue { + step.extend(self.handle_message(&sender_id, msg)?); + } + } + Ok(step) + } + + /// If the winner of the vote has changed, restarts Key Generation for the set of nodes implied + /// by the current change. + pub(super) fn update_key_gen( + &mut self, + epoch: u64, + change: &Change, + ) -> Result> { + if self.key_gen_state.as_ref().map(|kgs| &kgs.change) == Some(change) { + return Ok(Step::default()); // The change is the same as before. Continue DKG as is. + } + debug!("{:?} Restarting DKG for {:?}.", self.our_id(), change); + // Use the existing key shares - with the change applied - as keys for DKG. + let mut pub_keys = self.netinfo.public_key_map().clone(); + if match *change { + Change::Remove(ref id) => pub_keys.remove(id).is_none(), + Change::Add(ref id, ref pk) => pub_keys.insert(id.clone(), pk.clone()).is_some(), + } { + info!("{:?} No-op change: {:?}", self.our_id(), change); + } + self.restart_honey_badger(epoch); + // TODO: This needs to be the same as `num_faulty` will be in the _new_ + // `NetworkInfo` if the change goes through. It would be safer to deduplicate. + let threshold = (pub_keys.len() - 1) / 3; + let sk = self.netinfo.secret_key().clone(); + let our_uid = self.our_id().clone(); + let (key_gen, part) = SyncKeyGen::new(our_uid, sk, pub_keys, threshold); + self.key_gen_state = Some(KeyGenState::new(key_gen, change.clone())); + if let Some(part) = part { + self.send_transaction(KeyGenMessage::Part(part)) + } else { + Ok(Step::default()) + } + } + + /// Starts a new `HoneyBadger` instance and resets the vote counter. + fn restart_honey_badger(&mut self, epoch: u64) { + self.start_epoch = epoch; + self.key_gen_msg_buffer.retain(|kg_msg| kg_msg.0 >= epoch); + let netinfo = Arc::new(self.netinfo.clone()); + let counter = VoteCounter::new(netinfo.clone(), epoch); + mem::replace(&mut self.vote_counter, counter); + self.honey_badger = HoneyBadger::builder(netinfo) + .max_future_epochs(self.max_future_epochs) + .build(); + } + + /// Handles a `Part` message that was output by Honey Badger. + fn handle_part(&mut self, sender_id: &NodeUid, part: Part) -> Result> { + let handle = |kgs: &mut KeyGenState| kgs.key_gen.handle_part(&sender_id, part); + match self.key_gen_state.as_mut().and_then(handle) { + Some(PartOutcome::Valid(ack)) => self.send_transaction(KeyGenMessage::Ack(ack)), + Some(PartOutcome::Invalid(fault_log)) => Ok(fault_log.into()), + None => Ok(Step::default()), + } + } + + /// Handles an `Ack` message that was output by Honey Badger. + fn handle_ack(&mut self, sender_id: &NodeUid, ack: Ack) -> Result> { + if let Some(kgs) = self.key_gen_state.as_mut() { + Ok(kgs.key_gen.handle_ack(sender_id, ack)) + } else { + Ok(FaultLog::new()) + } + } + + /// Signs and sends a `KeyGenMessage` and also tries to commit it. + fn send_transaction(&mut self, kg_msg: KeyGenMessage) -> Result> { + let ser = + bincode::serialize(&kg_msg).map_err(|err| ErrorKind::SendTransactionBincode(*err))?; + let sig = Box::new(self.netinfo.secret_key().sign(ser)); + if self.netinfo.is_validator() { + let our_uid = self.netinfo.our_uid().clone(); + let signed_msg = + SignedKeyGenMsg(self.start_epoch, our_uid, kg_msg.clone(), *sig.clone()); + self.key_gen_msg_buffer.push(signed_msg); + } + let msg = Message::KeyGen(self.start_epoch, kg_msg, sig); + Ok(Target::All.message(msg).into()) + } + + /// If the current Key Generation process is ready, returns the `KeyGenState`. + /// + /// We require the minimum number of completed proposals (`SyncKeyGen::is_ready`) and if a new + /// node is joining, we require in addition that the new node's proposal is complete. That way + /// the new node knows that it's key is secret, without having to trust any number of nodes. + fn take_ready_key_gen(&mut self) -> Option> { + if self + .key_gen_state + .as_ref() + .map_or(false, KeyGenState::is_ready) + { + self.key_gen_state.take() + } else { + None + } + } + + /// Returns `true` if the signature of `kg_msg` by the node with the specified ID is valid. + /// Returns an error if the payload fails to serialize. + /// + /// This accepts signatures from both validators and the currently joining candidate, if any. + fn verify_signature( + &self, + node_id: &NodeUid, + sig: &Signature, + kg_msg: &KeyGenMessage, + ) -> Result { + let ser = + bincode::serialize(kg_msg).map_err(|err| ErrorKind::VerifySignatureBincode(*err))?; + let get_candidate_key = || { + self.key_gen_state + .as_ref() + .and_then(|kgs| kgs.candidate_key(node_id)) + }; + let pk_opt = self.netinfo.public_key(node_id).or_else(get_candidate_key); + Ok(pk_opt.map_or(false, |pk| pk.verify(&sig, ser))) + } +} diff --git a/src/dynamic_honey_badger/mod.rs b/src/dynamic_honey_badger/mod.rs index 2e54b4c..f7049e5 100644 --- a/src/dynamic_honey_badger/mod.rs +++ b/src/dynamic_honey_badger/mod.rs @@ -55,33 +55,30 @@ //! and replaced by a new one with the new set of participants. If a different change wins a //! vote before that happens, key generation resets again, and is attempted for the new change. +mod batch; +mod builder; +mod change; +mod dynamic_honey_badger; +mod error; +mod votes; + +use crypto::{PublicKey, PublicKeySet, Signature}; use rand::Rand; use std::collections::BTreeMap; use std::fmt::Debug; -use std::hash::Hash; -use std::mem; -use std::sync::Arc; - -use bincode; -use crypto::{PublicKey, PublicKeySet, Signature}; -use serde::{Deserialize, Serialize}; use self::votes::{SignedVote, VoteCounter}; -use fault_log::{Fault, FaultKind, FaultLog}; -use honey_badger::{self, HoneyBadger, Message as HbMessage}; -use messaging::{self, DistAlgorithm, NetworkInfo, Target}; -use sync_key_gen::{Ack, Part, PartOutcome, SyncKeyGen}; +use honey_badger::Message as HbMessage; +use messaging; +use sync_key_gen::{Ack, Part, SyncKeyGen}; pub use self::batch::Batch; pub use self::builder::DynamicHoneyBadgerBuilder; pub use self::change::{Change, ChangeState}; +pub use self::dynamic_honey_badger::DynamicHoneyBadger; pub use self::error::{Error, ErrorKind, Result}; -mod batch; -mod builder; -mod change; -mod error; -mod votes; +pub type Step = messaging::Step>; /// The user input for `DynamicHoneyBadger`. #[derive(Clone, Debug)] @@ -92,421 +89,6 @@ pub enum Input { Change(Change), } -/// A Honey Badger instance that can handle adding and removing nodes. -#[derive(Debug)] -pub struct DynamicHoneyBadger { - /// Shared network data. - netinfo: NetworkInfo, - /// The maximum number of future epochs for which we handle messages simultaneously. - max_future_epochs: usize, - /// The first epoch after the latest node change. - start_epoch: u64, - /// The buffer and counter for the pending and committed change votes. - vote_counter: VoteCounter, - /// Pending node transactions that we will propose in the next epoch. - key_gen_msg_buffer: Vec>, - /// The `HoneyBadger` instance with the current set of nodes. - honey_badger: HoneyBadger, NodeUid>, - /// The current key generation process, and the change it applies to. - key_gen_state: Option>, - /// A queue for messages from future epochs that cannot be handled yet. - incoming_queue: Vec<(NodeUid, Message)>, -} - -pub type Step = messaging::Step>; - -impl DistAlgorithm for DynamicHoneyBadger -where - C: Eq + Serialize + for<'r> Deserialize<'r> + Debug + Hash, - NodeUid: Eq + Ord + Clone + Serialize + for<'r> Deserialize<'r> + Debug + Hash + Rand, -{ - type NodeUid = NodeUid; - type Input = Input; - type Output = Batch; - type Message = Message; - type Error = Error; - - fn input(&mut self, input: Self::Input) -> Result> { - // User contributions are forwarded to `HoneyBadger` right away. Votes are signed and - // broadcast. - match input { - Input::User(contrib) => self.propose(contrib), - Input::Change(change) => self.vote_for(change), - } - } - - fn handle_message( - &mut self, - sender_id: &NodeUid, - message: Self::Message, - ) -> Result> { - let epoch = message.start_epoch(); - if epoch < self.start_epoch { - // Obsolete message. - Ok(Step::default()) - } else if epoch > self.start_epoch { - // Message cannot be handled yet. Save it for later. - let entry = (sender_id.clone(), message); - self.incoming_queue.push(entry); - Ok(Step::default()) - } else { - match message { - Message::HoneyBadger(_, hb_msg) => { - self.handle_honey_badger_message(sender_id, hb_msg) - } - Message::KeyGen(_, kg_msg, sig) => self - .handle_key_gen_message(sender_id, kg_msg, *sig) - .map(FaultLog::into), - Message::SignedVote(signed_vote) => self - .vote_counter - .add_pending_vote(sender_id, signed_vote) - .map(FaultLog::into), - } - } - } - - fn terminated(&self) -> bool { - false - } - - fn our_id(&self) -> &NodeUid { - self.netinfo.our_uid() - } -} - -impl DynamicHoneyBadger -where - C: Eq + Serialize + for<'r> Deserialize<'r> + Debug + Hash, - NodeUid: Eq + Ord + Clone + Debug + Serialize + for<'r> Deserialize<'r> + Hash + Rand, -{ - /// Returns a new `DynamicHoneyBadgerBuilder`. - pub fn builder() -> DynamicHoneyBadgerBuilder { - DynamicHoneyBadgerBuilder::new() - } - - /// Returns `true` if input for the current epoch has already been provided. - pub fn has_input(&self) -> bool { - self.honey_badger.has_input() - } - - /// Proposes a contribution in the current epoch. - pub fn propose(&mut self, contrib: C) -> Result> { - let step = self - .honey_badger - .input(InternalContrib { - contrib, - key_gen_messages: self.key_gen_msg_buffer.clone(), - votes: self.vote_counter.pending_votes().cloned().collect(), - }) - .map_err(ErrorKind::ProposeHoneyBadger)?; - self.process_output(step) - } - - /// Cast a vote to change the set of validators. - pub fn vote_for(&mut self, change: Change) -> Result> { - if !self.netinfo.is_validator() { - return Ok(Step::default()); // TODO: Return an error? - } - let signed_vote = self.vote_counter.sign_vote_for(change)?.clone(); - let msg = Message::SignedVote(signed_vote); - Ok(Target::All.message(msg).into()) - } - - /// Returns the information about the node IDs in the network, and the cryptographic keys. - pub fn netinfo(&self) -> &NetworkInfo { - &self.netinfo - } - - /// Returns `true` if we should make our contribution for the next epoch, even if we don't have - /// content ourselves, to avoid stalling the network. - /// - /// By proposing only if this returns `true`, you can prevent an adversary from making the - /// network output empty baches indefinitely, but it also means that the network won't advance - /// if fewer than _f + 1_ nodes have pending contributions. - pub fn should_propose(&self) -> bool { - if self.has_input() { - return false; // We have already proposed. - } - if self.honey_badger.received_proposals() > self.netinfo.num_faulty() { - return true; // At least one correct node wants to move on to the next epoch. - } - let is_our_vote = |signed_vote: &SignedVote<_>| signed_vote.voter() == self.our_id(); - if self.vote_counter.pending_votes().any(is_our_vote) { - return true; // We have pending input to vote for a validator change. - } - let kgs = match self.key_gen_state { - None => return false, // No ongoing key generation. - Some(ref kgs) => kgs, - }; - // If either we or the candidate have a pending key gen message, we should propose. - let ours_or_candidates = |msg: &SignedKeyGenMsg<_>| { - msg.1 == *self.our_id() || Some(&msg.1) == kgs.change.candidate() - }; - self.key_gen_msg_buffer.iter().any(ours_or_candidates) - } - - /// Handles a message for the `HoneyBadger` instance. - fn handle_honey_badger_message( - &mut self, - sender_id: &NodeUid, - message: HbMessage, - ) -> Result> { - if !self.netinfo.is_node_validator(sender_id) { - info!("Unknown sender {:?} of message {:?}", sender_id, message); - return Err(ErrorKind::UnknownSender.into()); - } - // Handle the message. - let step = self - .honey_badger - .handle_message(sender_id, message) - .map_err(ErrorKind::HandleHoneyBadgerMessageHoneyBadger)?; - self.process_output(step) - } - - /// Handles a vote or key generation message and tries to commit it as a transaction. These - /// messages are only handled once they appear in a batch output from Honey Badger. - fn handle_key_gen_message( - &mut self, - sender_id: &NodeUid, - kg_msg: KeyGenMessage, - sig: Signature, - ) -> Result> { - if !self.verify_signature(sender_id, &sig, &kg_msg)? { - info!("Invalid signature from {:?} for: {:?}.", sender_id, kg_msg); - let fault_kind = FaultKind::InvalidKeyGenMessageSignature; - return Ok(Fault::new(sender_id.clone(), fault_kind).into()); - } - let kgs = match self.key_gen_state { - Some(ref mut kgs) => kgs, - None => { - info!( - "Unexpected key gen message from {:?}: {:?}.", - sender_id, kg_msg - ); - return Ok(Fault::new(sender_id.clone(), FaultKind::UnexpectedKeyGenMessage).into()); - } - }; - - // If the joining node is correct, it will send at most (N + 1)² + 1 key generation - // messages. - if Some(sender_id) == kgs.change.candidate() { - let n = self.netinfo.num_nodes() + 1; - if kgs.candidate_msg_count > n * n { - info!( - "Too many key gen messages from candidate {:?}: {:?}.", - sender_id, kg_msg - ); - let fault_kind = FaultKind::TooManyCandidateKeyGenMessages; - return Ok(Fault::new(sender_id.clone(), fault_kind).into()); - } - kgs.candidate_msg_count += 1; - } - - let tx = SignedKeyGenMsg(self.start_epoch, sender_id.clone(), kg_msg, sig); - self.key_gen_msg_buffer.push(tx); - Ok(FaultLog::default()) - } - - /// Processes all pending batches output by Honey Badger. - fn process_output( - &mut self, - hb_step: honey_badger::Step, NodeUid>, - ) -> Result> { - let mut step: Step = Step::default(); - let start_epoch = self.start_epoch; - let output = step.extend_with(hb_step, |hb_msg| Message::HoneyBadger(start_epoch, hb_msg)); - for hb_batch in output { - // Create the batch we output ourselves. It will contain the _user_ transactions of - // `hb_batch`, and the current change state. - let mut batch = Batch::new(hb_batch.epoch + self.start_epoch); - - // Add the user transactions to `batch` and handle votes and DKG messages. - for (id, int_contrib) in hb_batch.contributions { - let InternalContrib { - votes, - key_gen_messages, - contrib, - } = int_contrib; - step.fault_log - .extend(self.vote_counter.add_committed_votes(&id, votes)?); - batch.contributions.insert(id.clone(), contrib); - self.key_gen_msg_buffer - .retain(|skgm| !key_gen_messages.contains(skgm)); - for SignedKeyGenMsg(epoch, s_id, kg_msg, sig) in key_gen_messages { - if epoch < self.start_epoch { - info!("Obsolete key generation message: {:?}.", kg_msg); - continue; - } - if !self.verify_signature(&s_id, &sig, &kg_msg)? { - info!( - "Invalid signature in {:?}'s batch from {:?} for: {:?}.", - id, s_id, kg_msg - ); - let fault_kind = FaultKind::InvalidKeyGenMessageSignature; - step.fault_log.append(id.clone(), fault_kind); - continue; - } - step.extend(match kg_msg { - KeyGenMessage::Part(part) => self.handle_part(&s_id, part)?, - KeyGenMessage::Ack(ack) => self.handle_ack(&s_id, ack)?.into(), - }); - } - } - - if let Some(kgs) = self.take_ready_key_gen() { - // If DKG completed, apply the change, restart Honey Badger, and inform the user. - debug!("{:?} DKG for {:?} complete!", self.our_id(), kgs.change); - self.netinfo = kgs.key_gen.into_network_info(); - self.restart_honey_badger(batch.epoch + 1); - batch.set_change(ChangeState::Complete(kgs.change), &self.netinfo); - } else if let Some(change) = self.vote_counter.compute_winner().cloned() { - // If there is a new change, restart DKG. Inform the user about the current change. - step.extend(self.update_key_gen(batch.epoch + 1, &change)?); - batch.set_change(ChangeState::InProgress(change), &self.netinfo); - } - step.output.push_back(batch); - } - // If `start_epoch` changed, we can now handle some queued messages. - if start_epoch < self.start_epoch { - let queue = mem::replace(&mut self.incoming_queue, Vec::new()); - for (sender_id, msg) in queue { - step.extend(self.handle_message(&sender_id, msg)?); - } - } - Ok(step) - } - - /// If the winner of the vote has changed, restarts Key Generation for the set of nodes implied - /// by the current change. - fn update_key_gen(&mut self, epoch: u64, change: &Change) -> Result> { - if self.key_gen_state.as_ref().map(|kgs| &kgs.change) == Some(change) { - return Ok(Step::default()); // The change is the same as before. Continue DKG as is. - } - debug!("{:?} Restarting DKG for {:?}.", self.our_id(), change); - // Use the existing key shares - with the change applied - as keys for DKG. - let mut pub_keys = self.netinfo.public_key_map().clone(); - if match *change { - Change::Remove(ref id) => pub_keys.remove(id).is_none(), - Change::Add(ref id, ref pk) => pub_keys.insert(id.clone(), pk.clone()).is_some(), - } { - info!("{:?} No-op change: {:?}", self.our_id(), change); - } - self.restart_honey_badger(epoch); - // TODO: This needs to be the same as `num_faulty` will be in the _new_ - // `NetworkInfo` if the change goes through. It would be safer to deduplicate. - let threshold = (pub_keys.len() - 1) / 3; - let sk = self.netinfo.secret_key().clone(); - let our_uid = self.our_id().clone(); - let (key_gen, part) = SyncKeyGen::new(our_uid, sk, pub_keys, threshold); - self.key_gen_state = Some(KeyGenState::new(key_gen, change.clone())); - if let Some(part) = part { - self.send_transaction(KeyGenMessage::Part(part)) - } else { - Ok(Step::default()) - } - } - - /// Starts a new `HoneyBadger` instance and resets the vote counter. - fn restart_honey_badger(&mut self, epoch: u64) { - self.start_epoch = epoch; - self.key_gen_msg_buffer.retain(|kg_msg| kg_msg.0 >= epoch); - let netinfo = Arc::new(self.netinfo.clone()); - let counter = VoteCounter::new(netinfo.clone(), epoch); - mem::replace(&mut self.vote_counter, counter); - self.honey_badger = HoneyBadger::builder(netinfo) - .max_future_epochs(self.max_future_epochs) - .build(); - } - - /// Handles a `Part` message that was output by Honey Badger. - fn handle_part(&mut self, sender_id: &NodeUid, part: Part) -> Result> { - let handle = |kgs: &mut KeyGenState| kgs.key_gen.handle_part(&sender_id, part); - match self.key_gen_state.as_mut().and_then(handle) { - Some(PartOutcome::Valid(ack)) => self.send_transaction(KeyGenMessage::Ack(ack)), - Some(PartOutcome::Invalid(fault_log)) => Ok(fault_log.into()), - None => Ok(Step::default()), - } - } - - /// Handles an `Ack` message that was output by Honey Badger. - fn handle_ack(&mut self, sender_id: &NodeUid, ack: Ack) -> Result> { - if let Some(kgs) = self.key_gen_state.as_mut() { - Ok(kgs.key_gen.handle_ack(sender_id, ack)) - } else { - Ok(FaultLog::new()) - } - } - - /// Signs and sends a `KeyGenMessage` and also tries to commit it. - fn send_transaction(&mut self, kg_msg: KeyGenMessage) -> Result> { - let ser = - bincode::serialize(&kg_msg).map_err(|err| ErrorKind::SendTransactionBincode(*err))?; - let sig = Box::new(self.netinfo.secret_key().sign(ser)); - if self.netinfo.is_validator() { - let our_uid = self.netinfo.our_uid().clone(); - let signed_msg = - SignedKeyGenMsg(self.start_epoch, our_uid, kg_msg.clone(), *sig.clone()); - self.key_gen_msg_buffer.push(signed_msg); - } - let msg = Message::KeyGen(self.start_epoch, kg_msg, sig); - Ok(Target::All.message(msg).into()) - } - - /// If the current Key Generation process is ready, returns the `KeyGenState`. - /// - /// We require the minimum number of completed proposals (`SyncKeyGen::is_ready`) and if a new - /// node is joining, we require in addition that the new node's proposal is complete. That way - /// the new node knows that it's key is secret, without having to trust any number of nodes. - fn take_ready_key_gen(&mut self) -> Option> { - if self - .key_gen_state - .as_ref() - .map_or(false, KeyGenState::is_ready) - { - self.key_gen_state.take() - } else { - None - } - } - - /// Returns `true` if the signature of `kg_msg` by the node with the specified ID is valid. - /// Returns an error if the payload fails to serialize. - /// - /// This accepts signatures from both validators and the currently joining candidate, if any. - fn verify_signature( - &self, - node_id: &NodeUid, - sig: &Signature, - kg_msg: &KeyGenMessage, - ) -> Result { - let ser = - bincode::serialize(kg_msg).map_err(|err| ErrorKind::VerifySignatureBincode(*err))?; - let get_candidate_key = || { - self.key_gen_state - .as_ref() - .and_then(|kgs| kgs.candidate_key(node_id)) - }; - let pk_opt = self.netinfo.public_key(node_id).or_else(get_candidate_key); - Ok(pk_opt.map_or(false, |pk| pk.verify(&sig, ser))) - } -} - -/// The contribution for the internal `HoneyBadger` instance: this includes a user-defined -/// application-level contribution as well as internal signed messages. -#[derive(Clone, Eq, PartialEq, Debug, Serialize, Deserialize, Hash)] -struct InternalContrib { - /// A user-defined contribution. - contrib: C, - /// Key generation messages that get committed via Honey Badger to communicate synchronously. - key_gen_messages: Vec>, - /// Signed votes for validator set changes. - votes: Vec>, -} - -/// A signed internal message. -#[derive(Eq, PartialEq, Debug, Serialize, Deserialize, Hash, Clone)] -struct SignedKeyGenMsg(u64, NodeUid, KeyGenMessage, Signature); - /// An internal message containing a vote for adding or removing a validator, or a message for key /// generation. It gets committed via Honey Badger and is only handled after it has been output in /// a batch, so that all nodes see these messages in the same order. @@ -598,3 +180,19 @@ impl KeyGenState { } } } + +/// The contribution for the internal `HoneyBadger` instance: this includes a user-defined +/// application-level contribution as well as internal signed messages. +#[derive(Clone, Eq, PartialEq, Debug, Serialize, Deserialize, Hash)] +struct InternalContrib { + /// A user-defined contribution. + contrib: C, + /// Key generation messages that get committed via Honey Badger to communicate synchronously. + key_gen_messages: Vec>, + /// Signed votes for validator set changes. + votes: Vec>, +} + +/// A signed internal message. +#[derive(Eq, PartialEq, Debug, Serialize, Deserialize, Hash, Clone)] +struct SignedKeyGenMsg(u64, NodeUid, KeyGenMessage, Signature); diff --git a/src/dynamic_honey_badger/votes.rs b/src/dynamic_honey_badger/votes.rs index b714bcc..754343d 100644 --- a/src/dynamic_honey_badger/votes.rs +++ b/src/dynamic_honey_badger/votes.rs @@ -67,11 +67,10 @@ where sender_id: &NodeUid, signed_vote: SignedVote, ) -> Result> { - if signed_vote.vote.era != self.era - || self - .pending - .get(&signed_vote.voter) - .map_or(false, |sv| sv.vote.num >= signed_vote.vote.num) + if signed_vote.vote.era != self.era || self + .pending + .get(&signed_vote.voter) + .map_or(false, |sv| sv.vote.num >= signed_vote.vote.num) { return Ok(FaultLog::new()); // The vote is obsolete or already exists. } @@ -150,8 +149,8 @@ where /// Returns `true` if the signature is valid. fn validate(&self, signed_vote: &SignedVote) -> Result { - let ser_vote = - bincode::serialize(&signed_vote.vote).map_err(|err| ErrorKind::ValidateBincode(*err))?; + let ser_vote = bincode::serialize(&signed_vote.vote) + .map_err(|err| ErrorKind::ValidateBincode(*err))?; let pk_opt = self.netinfo.public_key(&signed_vote.voter); Ok(pk_opt.map_or(false, |pk| pk.verify(&signed_vote.sig, ser_vote))) } diff --git a/src/honey_badger/honey_badger.rs b/src/honey_badger/honey_badger.rs index 553a1a1..937c6c0 100644 --- a/src/honey_badger/honey_badger.rs +++ b/src/honey_badger/honey_badger.rs @@ -270,8 +270,7 @@ where step.fault_log.append(proposer_id, fault_kind); None } - }) - .collect(); + }).collect(); let batch = Batch { epoch: self.epoch, contributions, diff --git a/tests/dynamic_honey_badger.rs b/tests/dynamic_honey_badger.rs index e31bb04..f64ab42 100644 --- a/tests/dynamic_honey_badger.rs +++ b/tests/dynamic_honey_badger.rs @@ -81,8 +81,7 @@ where // If there's only one node, it will immediately output on input. Make sure we // first process all incoming messages before providing input again. && (network.nodes.len() > 2 || node.queue.is_empty()) - }) - .map(|(id, _)| *id) + }).map(|(id, _)| *id) .collect(); if let Some(id) = rng.choose(&input_ids) { let queue = queues.get_mut(id).unwrap(); diff --git a/tests/honey_badger.rs b/tests/honey_badger.rs index 8d1ae0a..b5f2f06 100644 --- a/tests/honey_badger.rs +++ b/tests/honey_badger.rs @@ -175,8 +175,7 @@ where epoch, contributions, }| (epoch, contributions), - ) - .collect(); + ).collect(); if expected.is_none() { expected = Some(outputs); } else if let Some(expected) = &expected { diff --git a/tests/sync_key_gen.rs b/tests/sync_key_gen.rs index 49d0558..d6099e4 100644 --- a/tests/sync_key_gen.rs +++ b/tests/sync_key_gen.rs @@ -31,8 +31,7 @@ fn test_sync_key_gen_with(threshold: usize, node_num: usize) { let (sync_key_gen, proposal) = SyncKeyGen::new(id, sk, pub_keys.clone(), threshold); nodes.push(sync_key_gen); proposal - }) - .collect(); + }).collect(); // Handle the first `threshold + 1` proposals. Those should suffice for key generation. let mut acks = Vec::new(); @@ -72,8 +71,7 @@ fn test_sync_key_gen_with(threshold: usize, node_num: usize) { let sig = sk.sign(msg); assert!(pks.public_key_share(idx).verify(&sig, msg)); (idx, sig) - }) - .collect(); + }).collect(); let sig = pub_key_set .combine_signatures(sig_shares.iter().take(threshold + 1)) .expect("signature shares match");