mirror of https://github.com/poanetwork/hbbft.git
Merge pull request #184 from c0gent/c0gent-mod-reorg
Reorganize `dynamic_honey_badger` and `agreement` modules slightly. * Some additional rustfmt-ing crept in due to me accidentally using a newer version.
This commit is contained in:
commit
d74530004f
|
@ -41,7 +41,6 @@ pub fn make(
|
||||||
TcpStream::connect(address).expect("failed to connect")
|
TcpStream::connect(address).expect("failed to connect")
|
||||||
};
|
};
|
||||||
Connection::new(tcp_conn, there_str.to_string())
|
Connection::new(tcp_conn, there_str.to_string())
|
||||||
})
|
}).collect();
|
||||||
.collect();
|
|
||||||
(here_str, connections)
|
(here_str, connections)
|
||||||
}
|
}
|
||||||
|
|
|
@ -208,8 +208,7 @@ impl<T: Clone + Debug + AsRef<[u8]> + PartialEq + Send + Sync + From<Vec<u8>> +
|
||||||
.send(())
|
.send(())
|
||||||
.map_err(|e| {
|
.map_err(|e| {
|
||||||
error!("{}", e);
|
error!("{}", e);
|
||||||
})
|
}).unwrap();
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
process::exit(0);
|
process::exit(0);
|
||||||
}) // end of thread scope
|
}) // end of thread scope
|
||||||
|
|
|
@ -156,8 +156,7 @@ where
|
||||||
target: msg.target,
|
target: msg.target,
|
||||||
message: ser_msg,
|
message: ser_msg,
|
||||||
}
|
}
|
||||||
})
|
}).collect();
|
||||||
.collect();
|
|
||||||
let outputs = step
|
let outputs = step
|
||||||
.output
|
.output
|
||||||
.into_iter()
|
.into_iter()
|
||||||
|
@ -204,8 +203,7 @@ where
|
||||||
.map(|msg| {
|
.map(|msg| {
|
||||||
let ser_msg = bincode::serialize(&msg.message).expect("serialize");
|
let ser_msg = bincode::serialize(&msg.message).expect("serialize");
|
||||||
(msg.target, ser_msg)
|
(msg.target, ser_msg)
|
||||||
})
|
}).collect();
|
||||||
.collect();
|
|
||||||
self.time += start.elapsed() * self.hw_quality.cpu_factor / 100;
|
self.time += start.elapsed() * self.hw_quality.cpu_factor / 100;
|
||||||
let time = self.time;
|
let time = self.time;
|
||||||
self.outputs
|
self.outputs
|
||||||
|
|
|
@ -0,0 +1,422 @@
|
||||||
|
use std::collections::BTreeMap;
|
||||||
|
use std::fmt::Debug;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use itertools::Itertools;
|
||||||
|
|
||||||
|
use super::bool_multimap::BoolMultimap;
|
||||||
|
use super::sbv_broadcast::{self, SbvBroadcast};
|
||||||
|
use super::{AgreementContent, AgreementMessage, Error, Nonce, Result, Step};
|
||||||
|
use agreement::bool_set::BoolSet;
|
||||||
|
use common_coin::{self, CommonCoin, CommonCoinMessage};
|
||||||
|
use messaging::{DistAlgorithm, NetworkInfo, Target};
|
||||||
|
|
||||||
|
/// The state of the current epoch's common coin. In some epochs this is fixed, in others it starts
|
||||||
|
/// with in `InProgress`.
|
||||||
|
#[derive(Debug)]
|
||||||
|
enum CoinState<NodeUid> {
|
||||||
|
/// The value was fixed in the current epoch, or the coin has already terminated.
|
||||||
|
Decided(bool),
|
||||||
|
/// The coin value is not known yet.
|
||||||
|
InProgress(CommonCoin<NodeUid, Nonce>),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<NodeUid> CoinState<NodeUid> {
|
||||||
|
/// Returns the value, if this coin has already decided.
|
||||||
|
fn value(&self) -> Option<bool> {
|
||||||
|
match self {
|
||||||
|
CoinState::Decided(value) => Some(*value),
|
||||||
|
CoinState::InProgress(_) => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<NodeUid> From<bool> for CoinState<NodeUid> {
|
||||||
|
fn from(value: bool) -> Self {
|
||||||
|
CoinState::Decided(value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Binary Agreement instance
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Agreement<NodeUid> {
|
||||||
|
/// Shared network information.
|
||||||
|
netinfo: Arc<NetworkInfo<NodeUid>>,
|
||||||
|
/// Session ID, e.g, the Honey Badger algorithm epoch.
|
||||||
|
session_id: u64,
|
||||||
|
/// The ID of the proposer of the value for this agreement instance.
|
||||||
|
proposer_id: NodeUid,
|
||||||
|
/// Agreement algorithm epoch.
|
||||||
|
epoch: u32,
|
||||||
|
/// This epoch's Synchronized Binary Value Broadcast instance.
|
||||||
|
sbv_broadcast: SbvBroadcast<NodeUid>,
|
||||||
|
/// Received `Conf` messages. Reset on every epoch update.
|
||||||
|
received_conf: BTreeMap<NodeUid, BoolSet>,
|
||||||
|
/// Received `Term` messages. Kept throughout epoch updates. These count as `BVal`, `Aux` and
|
||||||
|
/// `Conf` messages for all future epochs.
|
||||||
|
received_term: BoolMultimap<NodeUid>,
|
||||||
|
/// The estimate of the decision value in the current epoch.
|
||||||
|
estimated: Option<bool>,
|
||||||
|
/// A permanent, latching copy of the output value. This copy is required because `output` can
|
||||||
|
/// be consumed using `DistAlgorithm::next_output` immediately after the instance finishing to
|
||||||
|
/// handle a message, in which case it would otherwise be unknown whether the output value was
|
||||||
|
/// ever there at all. While the output value will still be required in a later epoch to decide
|
||||||
|
/// the termination state.
|
||||||
|
decision: Option<bool>,
|
||||||
|
/// A cache for messages for future epochs that cannot be handled yet.
|
||||||
|
// TODO: Find a better solution for this; defend against spam.
|
||||||
|
incoming_queue: BTreeMap<u32, Vec<(NodeUid, AgreementContent)>>,
|
||||||
|
/// The values we found in the first _N - f_ `Aux` messages that were in `bin_values`.
|
||||||
|
conf_values: Option<BoolSet>,
|
||||||
|
/// The state of this epoch's common coin.
|
||||||
|
coin_state: CoinState<NodeUid>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<NodeUid: Clone + Debug + Ord> DistAlgorithm for Agreement<NodeUid> {
|
||||||
|
type NodeUid = NodeUid;
|
||||||
|
type Input = bool;
|
||||||
|
type Output = bool;
|
||||||
|
type Message = AgreementMessage;
|
||||||
|
type Error = Error;
|
||||||
|
|
||||||
|
fn input(&mut self, input: Self::Input) -> Result<Step<NodeUid>> {
|
||||||
|
self.set_input(input)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Receive input from a remote node.
|
||||||
|
fn handle_message(
|
||||||
|
&mut self,
|
||||||
|
sender_id: &Self::NodeUid,
|
||||||
|
AgreementMessage { epoch, content }: Self::Message,
|
||||||
|
) -> Result<Step<NodeUid>> {
|
||||||
|
if self.decision.is_some() || (epoch < self.epoch && content.can_expire()) {
|
||||||
|
// Message is obsolete: We are already in a later epoch or terminated.
|
||||||
|
Ok(Step::default())
|
||||||
|
} else if epoch > self.epoch {
|
||||||
|
// Message is for a later epoch. We can't handle that yet.
|
||||||
|
let queue = self.incoming_queue.entry(epoch).or_insert_with(Vec::new);
|
||||||
|
queue.push((sender_id.clone(), content));
|
||||||
|
Ok(Step::default())
|
||||||
|
} else {
|
||||||
|
self.handle_message_content(sender_id, content)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Whether the algorithm has terminated.
|
||||||
|
fn terminated(&self) -> bool {
|
||||||
|
self.decision.is_some()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn our_id(&self) -> &Self::NodeUid {
|
||||||
|
self.netinfo.our_uid()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
|
||||||
|
pub fn new(
|
||||||
|
netinfo: Arc<NetworkInfo<NodeUid>>,
|
||||||
|
session_id: u64,
|
||||||
|
proposer_id: NodeUid,
|
||||||
|
) -> Result<Self> {
|
||||||
|
if !netinfo.is_node_validator(&proposer_id) {
|
||||||
|
return Err(Error::UnknownProposer);
|
||||||
|
}
|
||||||
|
Ok(Agreement {
|
||||||
|
netinfo: netinfo.clone(),
|
||||||
|
session_id,
|
||||||
|
proposer_id,
|
||||||
|
epoch: 0,
|
||||||
|
sbv_broadcast: SbvBroadcast::new(netinfo),
|
||||||
|
received_conf: BTreeMap::new(),
|
||||||
|
received_term: BoolMultimap::default(),
|
||||||
|
estimated: None,
|
||||||
|
decision: None,
|
||||||
|
incoming_queue: BTreeMap::new(),
|
||||||
|
conf_values: None,
|
||||||
|
coin_state: CoinState::Decided(true),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Sets the input value for agreement.
|
||||||
|
fn set_input(&mut self, input: bool) -> Result<Step<NodeUid>> {
|
||||||
|
if self.epoch != 0 || self.estimated.is_some() {
|
||||||
|
return Err(Error::InputNotAccepted);
|
||||||
|
}
|
||||||
|
// Set the initial estimated value to the input value.
|
||||||
|
self.estimated = Some(input);
|
||||||
|
debug!("{:?}/{:?} Input {}", self.our_id(), self.proposer_id, input);
|
||||||
|
let sbvb_step = self.sbv_broadcast.input(input)?;
|
||||||
|
self.handle_sbvb_step(sbvb_step)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Acceptance check to be performed before setting the input value.
|
||||||
|
pub fn accepts_input(&self) -> bool {
|
||||||
|
self.epoch == 0 && self.estimated.is_none()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Dispatches the message content to the corresponding handling method.
|
||||||
|
fn handle_message_content(
|
||||||
|
&mut self,
|
||||||
|
sender_id: &NodeUid,
|
||||||
|
content: AgreementContent,
|
||||||
|
) -> Result<Step<NodeUid>> {
|
||||||
|
match content {
|
||||||
|
AgreementContent::SbvBroadcast(msg) => self.handle_sbv_broadcast(sender_id, msg),
|
||||||
|
AgreementContent::Conf(v) => self.handle_conf(sender_id, v),
|
||||||
|
AgreementContent::Term(v) => self.handle_term(sender_id, v),
|
||||||
|
AgreementContent::Coin(msg) => self.handle_coin(sender_id, *msg),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Handles a Synchroniced Binary Value Broadcast message.
|
||||||
|
fn handle_sbv_broadcast(
|
||||||
|
&mut self,
|
||||||
|
sender_id: &NodeUid,
|
||||||
|
msg: sbv_broadcast::Message,
|
||||||
|
) -> Result<Step<NodeUid>> {
|
||||||
|
let sbvb_step = self.sbv_broadcast.handle_message(sender_id, msg)?;
|
||||||
|
self.handle_sbvb_step(sbvb_step)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Handles a Synchronized Binary Value Broadcast step. On output, starts the `Conf` round or
|
||||||
|
/// decides.
|
||||||
|
fn handle_sbvb_step(
|
||||||
|
&mut self,
|
||||||
|
sbvb_step: sbv_broadcast::Step<NodeUid>,
|
||||||
|
) -> Result<Step<NodeUid>> {
|
||||||
|
let mut step = Step::default();
|
||||||
|
let output = step.extend_with(sbvb_step, |msg| {
|
||||||
|
AgreementContent::SbvBroadcast(msg).with_epoch(self.epoch)
|
||||||
|
});
|
||||||
|
if self.conf_values.is_some() {
|
||||||
|
return Ok(step); // The `Conf` round has already started.
|
||||||
|
}
|
||||||
|
if let Some(aux_vals) = output.into_iter().next() {
|
||||||
|
// Execute the Common Coin schedule `false, true, get_coin(), false, true, get_coin(), ...`
|
||||||
|
match self.coin_state {
|
||||||
|
CoinState::Decided(_) => {
|
||||||
|
self.conf_values = Some(aux_vals);
|
||||||
|
step.extend(self.try_update_epoch()?)
|
||||||
|
}
|
||||||
|
CoinState::InProgress(_) => {
|
||||||
|
// Start the `Conf` message round.
|
||||||
|
step.extend(self.send_conf(aux_vals)?)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(step)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Handles a `Conf` message. When _N - f_ `Conf` messages with values in `bin_values` have
|
||||||
|
/// been received, updates the epoch or decides.
|
||||||
|
fn handle_conf(&mut self, sender_id: &NodeUid, v: BoolSet) -> Result<Step<NodeUid>> {
|
||||||
|
self.received_conf.insert(sender_id.clone(), v);
|
||||||
|
self.try_finish_conf_round()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Handles a `Term(v)` message. If we haven't yet decided on a value and there are more than
|
||||||
|
/// _f_ such messages with the same value from different nodes, performs expedite termination:
|
||||||
|
/// decides on `v`, broadcasts `Term(v)` and terminates the instance.
|
||||||
|
fn handle_term(&mut self, sender_id: &NodeUid, b: bool) -> Result<Step<NodeUid>> {
|
||||||
|
self.received_term[b].insert(sender_id.clone());
|
||||||
|
// Check for the expedite termination condition.
|
||||||
|
if self.decision.is_some() {
|
||||||
|
Ok(Step::default())
|
||||||
|
} else if self.received_term[b].len() > self.netinfo.num_faulty() {
|
||||||
|
Ok(self.decide(b))
|
||||||
|
} else {
|
||||||
|
// Otherwise handle the `Term` as a `BVal`, `Aux` and `Conf`.
|
||||||
|
let mut sbvb_step = self.sbv_broadcast.handle_bval(sender_id, b)?;
|
||||||
|
sbvb_step.extend(self.sbv_broadcast.handle_aux(sender_id, b)?);
|
||||||
|
let mut step = self.handle_sbvb_step(sbvb_step)?;
|
||||||
|
step.extend(self.handle_conf(sender_id, BoolSet::from(b))?);
|
||||||
|
Ok(step)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Handles a Common Coin message. If there is output from Common Coin, starts the next
|
||||||
|
/// epoch. The function may output a decision value.
|
||||||
|
fn handle_coin(
|
||||||
|
&mut self,
|
||||||
|
sender_id: &NodeUid,
|
||||||
|
msg: CommonCoinMessage,
|
||||||
|
) -> Result<Step<NodeUid>> {
|
||||||
|
let coin_step = match self.coin_state {
|
||||||
|
CoinState::Decided(_) => return Ok(Step::default()), // Coin value is already decided.
|
||||||
|
CoinState::InProgress(ref mut common_coin) => common_coin
|
||||||
|
.handle_message(sender_id, msg)
|
||||||
|
.map_err(Error::HandleCoinCommonCoin)?,
|
||||||
|
};
|
||||||
|
self.on_coin_step(coin_step)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Multicasts a `Conf(values)` message, and handles it.
|
||||||
|
fn send_conf(&mut self, values: BoolSet) -> Result<Step<NodeUid>> {
|
||||||
|
if self.conf_values.is_some() {
|
||||||
|
// Only one `Conf` message is allowed in an epoch.
|
||||||
|
return Ok(Step::default());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Trigger the start of the `Conf` round.
|
||||||
|
self.conf_values = Some(values);
|
||||||
|
|
||||||
|
if !self.netinfo.is_validator() {
|
||||||
|
return Ok(self.try_finish_conf_round()?);
|
||||||
|
}
|
||||||
|
|
||||||
|
self.send(AgreementContent::Conf(values))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Multicasts and handles a message. Does nothing if we are only an observer.
|
||||||
|
fn send(&mut self, content: AgreementContent) -> Result<Step<NodeUid>> {
|
||||||
|
if !self.netinfo.is_validator() {
|
||||||
|
return Ok(Step::default());
|
||||||
|
}
|
||||||
|
let mut step: Step<_> = Target::All
|
||||||
|
.message(content.clone().with_epoch(self.epoch))
|
||||||
|
.into();
|
||||||
|
let our_uid = &self.netinfo.our_uid().clone();
|
||||||
|
step.extend(self.handle_message_content(our_uid, content)?);
|
||||||
|
Ok(step)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Handles a step returned from the `CommonCoin`.
|
||||||
|
fn on_coin_step(
|
||||||
|
&mut self,
|
||||||
|
coin_step: common_coin::Step<NodeUid, Nonce>,
|
||||||
|
) -> Result<Step<NodeUid>> {
|
||||||
|
let mut step = Step::default();
|
||||||
|
let epoch = self.epoch;
|
||||||
|
let to_msg = |c_msg| AgreementContent::Coin(Box::new(c_msg)).with_epoch(epoch);
|
||||||
|
let coin_output = step.extend_with(coin_step, to_msg);
|
||||||
|
if let Some(coin) = coin_output.into_iter().next() {
|
||||||
|
self.coin_state = coin.into();
|
||||||
|
step.extend(self.try_update_epoch()?);
|
||||||
|
}
|
||||||
|
Ok(step)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// If this epoch's coin value or conf values are not known yet, does nothing, otherwise
|
||||||
|
/// updates the epoch or decides.
|
||||||
|
///
|
||||||
|
/// With two conf values, the next epoch's estimate is the coin value. If there is only one conf
|
||||||
|
/// value and that disagrees with the coin, the conf value is the next epoch's estimate. If
|
||||||
|
/// the unique conf value agrees with the coin, terminates and decides on that value.
|
||||||
|
fn try_update_epoch(&mut self) -> Result<Step<NodeUid>> {
|
||||||
|
if self.decision.is_some() {
|
||||||
|
// Avoid an infinite regression without making an Agreement step.
|
||||||
|
return Ok(Step::default());
|
||||||
|
}
|
||||||
|
let coin = match self.coin_state.value() {
|
||||||
|
None => return Ok(Step::default()), // Still waiting for coin value.
|
||||||
|
Some(coin) => coin,
|
||||||
|
};
|
||||||
|
let def_bin_value = match self.conf_values {
|
||||||
|
None => return Ok(Step::default()), // Still waiting for conf value.
|
||||||
|
Some(ref values) => values.definite(),
|
||||||
|
};
|
||||||
|
|
||||||
|
if Some(coin) == def_bin_value {
|
||||||
|
Ok(self.decide(coin))
|
||||||
|
} else {
|
||||||
|
self.update_epoch(def_bin_value.unwrap_or(coin))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Creates the initial coin state for the current epoch, i.e. sets it to the predetermined
|
||||||
|
/// value, or initializes a `CommonCoin` instance.
|
||||||
|
fn coin_state(&self) -> CoinState<NodeUid> {
|
||||||
|
match self.epoch % 3 {
|
||||||
|
0 => CoinState::Decided(true),
|
||||||
|
1 => CoinState::Decided(false),
|
||||||
|
_ => {
|
||||||
|
let nonce = Nonce::new(
|
||||||
|
self.netinfo.invocation_id().as_ref(),
|
||||||
|
self.session_id,
|
||||||
|
self.netinfo.node_index(&self.proposer_id).unwrap(),
|
||||||
|
self.epoch,
|
||||||
|
);
|
||||||
|
CoinState::InProgress(CommonCoin::new(self.netinfo.clone(), nonce))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Decides on a value and broadcasts a `Term` message with that value.
|
||||||
|
fn decide(&mut self, b: bool) -> Step<NodeUid> {
|
||||||
|
if self.decision.is_some() {
|
||||||
|
return Step::default();
|
||||||
|
}
|
||||||
|
// Output the agreement value.
|
||||||
|
let mut step = Step::default();
|
||||||
|
step.output.push_back(b);
|
||||||
|
// Latch the decided state.
|
||||||
|
self.decision = Some(b);
|
||||||
|
debug!(
|
||||||
|
"{:?}/{:?} (is_validator: {}) decision: {}",
|
||||||
|
self.netinfo.our_uid(),
|
||||||
|
self.proposer_id,
|
||||||
|
self.netinfo.is_validator(),
|
||||||
|
b
|
||||||
|
);
|
||||||
|
if self.netinfo.is_validator() {
|
||||||
|
let msg = AgreementContent::Term(b).with_epoch(self.epoch + 1);
|
||||||
|
step.messages.push_back(Target::All.message(msg));
|
||||||
|
}
|
||||||
|
step
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Checks whether the _N - f_ `Conf` messages have arrived, and if so, activates the coin.
|
||||||
|
fn try_finish_conf_round(&mut self) -> Result<Step<NodeUid>> {
|
||||||
|
if self.conf_values.is_none() || self.count_conf() < self.netinfo.num_correct() {
|
||||||
|
return Ok(Step::default());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Invoke the common coin.
|
||||||
|
let coin_step = match self.coin_state {
|
||||||
|
CoinState::Decided(_) => return Ok(Step::default()), // Coin has already decided.
|
||||||
|
CoinState::InProgress(ref mut common_coin) => common_coin
|
||||||
|
.input(())
|
||||||
|
.map_err(Error::TryFinishConfRoundCommonCoin)?,
|
||||||
|
};
|
||||||
|
let mut step = self.on_coin_step(coin_step)?;
|
||||||
|
step.extend(self.try_update_epoch()?);
|
||||||
|
Ok(step)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Counts the number of received `Conf` messages with values in `bin_values`.
|
||||||
|
fn count_conf(&self) -> usize {
|
||||||
|
let is_bin_val = |conf: &&BoolSet| conf.is_subset(self.sbv_broadcast.bin_values());
|
||||||
|
self.received_conf.values().filter(is_bin_val).count()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Increments the epoch, sets the new estimate and handles queued messages.
|
||||||
|
fn update_epoch(&mut self, b: bool) -> Result<Step<NodeUid>> {
|
||||||
|
self.sbv_broadcast.clear(&self.received_term);
|
||||||
|
self.received_conf.clear();
|
||||||
|
for (v, id) in &self.received_term {
|
||||||
|
self.received_conf.insert(id.clone(), BoolSet::from(v));
|
||||||
|
}
|
||||||
|
self.conf_values = None;
|
||||||
|
self.epoch += 1;
|
||||||
|
self.coin_state = self.coin_state();
|
||||||
|
debug!(
|
||||||
|
"{:?} Agreement instance {:?} started epoch {}, {} terminated",
|
||||||
|
self.netinfo.our_uid(),
|
||||||
|
self.proposer_id,
|
||||||
|
self.epoch,
|
||||||
|
self.received_conf.len(),
|
||||||
|
);
|
||||||
|
|
||||||
|
self.estimated = Some(b);
|
||||||
|
let sbvb_step = self.sbv_broadcast.input(b)?;
|
||||||
|
let mut step = self.handle_sbvb_step(sbvb_step)?;
|
||||||
|
let queued_msgs = Itertools::flatten(self.incoming_queue.remove(&self.epoch).into_iter());
|
||||||
|
for (sender_id, content) in queued_msgs {
|
||||||
|
step.extend(self.handle_message_content(&sender_id, content)?);
|
||||||
|
if self.decision.is_some() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(step)
|
||||||
|
}
|
||||||
|
}
|
|
@ -63,22 +63,18 @@
|
||||||
//! * After _f + 1_ nodes have sent us their coin shares, we receive the coin output and assign it
|
//! * After _f + 1_ nodes have sent us their coin shares, we receive the coin output and assign it
|
||||||
//! to `s`.
|
//! to `s`.
|
||||||
|
|
||||||
|
mod agreement;
|
||||||
mod bool_multimap;
|
mod bool_multimap;
|
||||||
pub mod bool_set;
|
pub mod bool_set;
|
||||||
mod sbv_broadcast;
|
mod sbv_broadcast;
|
||||||
|
|
||||||
use rand;
|
use rand;
|
||||||
use std::collections::BTreeMap;
|
|
||||||
use std::fmt::Debug;
|
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
use itertools::Itertools;
|
use self::bool_set::BoolSet;
|
||||||
|
use common_coin::{self, CommonCoinMessage};
|
||||||
|
use messaging;
|
||||||
|
|
||||||
use self::bool_multimap::BoolMultimap;
|
pub use self::agreement::Agreement;
|
||||||
use self::sbv_broadcast::SbvBroadcast;
|
|
||||||
use agreement::bool_set::BoolSet;
|
|
||||||
use common_coin::{self, CommonCoin, CommonCoinMessage};
|
|
||||||
use messaging::{self, DistAlgorithm, NetworkInfo, Target};
|
|
||||||
|
|
||||||
/// An agreement error.
|
/// An agreement error.
|
||||||
#[derive(Clone, Eq, PartialEq, Debug, Fail)]
|
#[derive(Clone, Eq, PartialEq, Debug, Fail)]
|
||||||
|
@ -96,6 +92,8 @@ pub enum Error {
|
||||||
/// An agreement result.
|
/// An agreement result.
|
||||||
pub type Result<T> = ::std::result::Result<T, Error>;
|
pub type Result<T> = ::std::result::Result<T, Error>;
|
||||||
|
|
||||||
|
pub type Step<NodeUid> = messaging::Step<Agreement<NodeUid>>;
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
|
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
|
||||||
pub enum AgreementContent {
|
pub enum AgreementContent {
|
||||||
/// Synchronized Binary Value Broadcast message.
|
/// Synchronized Binary Value Broadcast message.
|
||||||
|
@ -150,418 +148,6 @@ impl rand::Rand for AgreementContent {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The state of the current epoch's common coin. In some epochs this is fixed, in others it starts
|
|
||||||
/// with in `InProgress`.
|
|
||||||
#[derive(Debug)]
|
|
||||||
enum CoinState<NodeUid> {
|
|
||||||
/// The value was fixed in the current epoch, or the coin has already terminated.
|
|
||||||
Decided(bool),
|
|
||||||
/// The coin value is not known yet.
|
|
||||||
InProgress(CommonCoin<NodeUid, Nonce>),
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<NodeUid> CoinState<NodeUid> {
|
|
||||||
/// Returns the value, if this coin has already decided.
|
|
||||||
fn value(&self) -> Option<bool> {
|
|
||||||
match self {
|
|
||||||
CoinState::Decided(value) => Some(*value),
|
|
||||||
CoinState::InProgress(_) => None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<NodeUid> From<bool> for CoinState<NodeUid> {
|
|
||||||
fn from(value: bool) -> Self {
|
|
||||||
CoinState::Decided(value)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Binary Agreement instance
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct Agreement<NodeUid> {
|
|
||||||
/// Shared network information.
|
|
||||||
netinfo: Arc<NetworkInfo<NodeUid>>,
|
|
||||||
/// Session ID, e.g, the Honey Badger algorithm epoch.
|
|
||||||
session_id: u64,
|
|
||||||
/// The ID of the proposer of the value for this agreement instance.
|
|
||||||
proposer_id: NodeUid,
|
|
||||||
/// Agreement algorithm epoch.
|
|
||||||
epoch: u32,
|
|
||||||
/// This epoch's Synchronized Binary Value Broadcast instance.
|
|
||||||
sbv_broadcast: SbvBroadcast<NodeUid>,
|
|
||||||
/// Received `Conf` messages. Reset on every epoch update.
|
|
||||||
received_conf: BTreeMap<NodeUid, BoolSet>,
|
|
||||||
/// Received `Term` messages. Kept throughout epoch updates. These count as `BVal`, `Aux` and
|
|
||||||
/// `Conf` messages for all future epochs.
|
|
||||||
received_term: BoolMultimap<NodeUid>,
|
|
||||||
/// The estimate of the decision value in the current epoch.
|
|
||||||
estimated: Option<bool>,
|
|
||||||
/// A permanent, latching copy of the output value. This copy is required because `output` can
|
|
||||||
/// be consumed using `DistAlgorithm::next_output` immediately after the instance finishing to
|
|
||||||
/// handle a message, in which case it would otherwise be unknown whether the output value was
|
|
||||||
/// ever there at all. While the output value will still be required in a later epoch to decide
|
|
||||||
/// the termination state.
|
|
||||||
decision: Option<bool>,
|
|
||||||
/// A cache for messages for future epochs that cannot be handled yet.
|
|
||||||
// TODO: Find a better solution for this; defend against spam.
|
|
||||||
incoming_queue: BTreeMap<u32, Vec<(NodeUid, AgreementContent)>>,
|
|
||||||
/// The values we found in the first _N - f_ `Aux` messages that were in `bin_values`.
|
|
||||||
conf_values: Option<BoolSet>,
|
|
||||||
/// The state of this epoch's common coin.
|
|
||||||
coin_state: CoinState<NodeUid>,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub type Step<NodeUid> = messaging::Step<Agreement<NodeUid>>;
|
|
||||||
|
|
||||||
impl<NodeUid: Clone + Debug + Ord> DistAlgorithm for Agreement<NodeUid> {
|
|
||||||
type NodeUid = NodeUid;
|
|
||||||
type Input = bool;
|
|
||||||
type Output = bool;
|
|
||||||
type Message = AgreementMessage;
|
|
||||||
type Error = Error;
|
|
||||||
|
|
||||||
fn input(&mut self, input: Self::Input) -> Result<Step<NodeUid>> {
|
|
||||||
self.set_input(input)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Receive input from a remote node.
|
|
||||||
fn handle_message(
|
|
||||||
&mut self,
|
|
||||||
sender_id: &Self::NodeUid,
|
|
||||||
AgreementMessage { epoch, content }: Self::Message,
|
|
||||||
) -> Result<Step<NodeUid>> {
|
|
||||||
if self.decision.is_some() || (epoch < self.epoch && content.can_expire()) {
|
|
||||||
// Message is obsolete: We are already in a later epoch or terminated.
|
|
||||||
Ok(Step::default())
|
|
||||||
} else if epoch > self.epoch {
|
|
||||||
// Message is for a later epoch. We can't handle that yet.
|
|
||||||
let queue = self.incoming_queue.entry(epoch).or_insert_with(Vec::new);
|
|
||||||
queue.push((sender_id.clone(), content));
|
|
||||||
Ok(Step::default())
|
|
||||||
} else {
|
|
||||||
self.handle_message_content(sender_id, content)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Whether the algorithm has terminated.
|
|
||||||
fn terminated(&self) -> bool {
|
|
||||||
self.decision.is_some()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn our_id(&self) -> &Self::NodeUid {
|
|
||||||
self.netinfo.our_uid()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
|
|
||||||
pub fn new(
|
|
||||||
netinfo: Arc<NetworkInfo<NodeUid>>,
|
|
||||||
session_id: u64,
|
|
||||||
proposer_id: NodeUid,
|
|
||||||
) -> Result<Self> {
|
|
||||||
if !netinfo.is_node_validator(&proposer_id) {
|
|
||||||
return Err(Error::UnknownProposer);
|
|
||||||
}
|
|
||||||
Ok(Agreement {
|
|
||||||
netinfo: netinfo.clone(),
|
|
||||||
session_id,
|
|
||||||
proposer_id,
|
|
||||||
epoch: 0,
|
|
||||||
sbv_broadcast: SbvBroadcast::new(netinfo),
|
|
||||||
received_conf: BTreeMap::new(),
|
|
||||||
received_term: BoolMultimap::default(),
|
|
||||||
estimated: None,
|
|
||||||
decision: None,
|
|
||||||
incoming_queue: BTreeMap::new(),
|
|
||||||
conf_values: None,
|
|
||||||
coin_state: CoinState::Decided(true),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Sets the input value for agreement.
|
|
||||||
fn set_input(&mut self, input: bool) -> Result<Step<NodeUid>> {
|
|
||||||
if self.epoch != 0 || self.estimated.is_some() {
|
|
||||||
return Err(Error::InputNotAccepted);
|
|
||||||
}
|
|
||||||
// Set the initial estimated value to the input value.
|
|
||||||
self.estimated = Some(input);
|
|
||||||
debug!("{:?}/{:?} Input {}", self.our_id(), self.proposer_id, input);
|
|
||||||
let sbvb_step = self.sbv_broadcast.input(input)?;
|
|
||||||
self.handle_sbvb_step(sbvb_step)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Acceptance check to be performed before setting the input value.
|
|
||||||
pub fn accepts_input(&self) -> bool {
|
|
||||||
self.epoch == 0 && self.estimated.is_none()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Dispatches the message content to the corresponding handling method.
|
|
||||||
fn handle_message_content(
|
|
||||||
&mut self,
|
|
||||||
sender_id: &NodeUid,
|
|
||||||
content: AgreementContent,
|
|
||||||
) -> Result<Step<NodeUid>> {
|
|
||||||
match content {
|
|
||||||
AgreementContent::SbvBroadcast(msg) => self.handle_sbv_broadcast(sender_id, msg),
|
|
||||||
AgreementContent::Conf(v) => self.handle_conf(sender_id, v),
|
|
||||||
AgreementContent::Term(v) => self.handle_term(sender_id, v),
|
|
||||||
AgreementContent::Coin(msg) => self.handle_coin(sender_id, *msg),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Handles a Synchroniced Binary Value Broadcast message.
|
|
||||||
fn handle_sbv_broadcast(
|
|
||||||
&mut self,
|
|
||||||
sender_id: &NodeUid,
|
|
||||||
msg: sbv_broadcast::Message,
|
|
||||||
) -> Result<Step<NodeUid>> {
|
|
||||||
let sbvb_step = self.sbv_broadcast.handle_message(sender_id, msg)?;
|
|
||||||
self.handle_sbvb_step(sbvb_step)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Handles a Synchronized Binary Value Broadcast step. On output, starts the `Conf` round or
|
|
||||||
/// decides.
|
|
||||||
fn handle_sbvb_step(
|
|
||||||
&mut self,
|
|
||||||
sbvb_step: sbv_broadcast::Step<NodeUid>,
|
|
||||||
) -> Result<Step<NodeUid>> {
|
|
||||||
let mut step = Step::default();
|
|
||||||
let output = step.extend_with(sbvb_step, |msg| {
|
|
||||||
AgreementContent::SbvBroadcast(msg).with_epoch(self.epoch)
|
|
||||||
});
|
|
||||||
if self.conf_values.is_some() {
|
|
||||||
return Ok(step); // The `Conf` round has already started.
|
|
||||||
}
|
|
||||||
if let Some(aux_vals) = output.into_iter().next() {
|
|
||||||
// Execute the Common Coin schedule `false, true, get_coin(), false, true, get_coin(), ...`
|
|
||||||
match self.coin_state {
|
|
||||||
CoinState::Decided(_) => {
|
|
||||||
self.conf_values = Some(aux_vals);
|
|
||||||
step.extend(self.try_update_epoch()?)
|
|
||||||
}
|
|
||||||
CoinState::InProgress(_) => {
|
|
||||||
// Start the `Conf` message round.
|
|
||||||
step.extend(self.send_conf(aux_vals)?)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(step)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Handles a `Conf` message. When _N - f_ `Conf` messages with values in `bin_values` have
|
|
||||||
/// been received, updates the epoch or decides.
|
|
||||||
fn handle_conf(&mut self, sender_id: &NodeUid, v: BoolSet) -> Result<Step<NodeUid>> {
|
|
||||||
self.received_conf.insert(sender_id.clone(), v);
|
|
||||||
self.try_finish_conf_round()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Handles a `Term(v)` message. If we haven't yet decided on a value and there are more than
|
|
||||||
/// _f_ such messages with the same value from different nodes, performs expedite termination:
|
|
||||||
/// decides on `v`, broadcasts `Term(v)` and terminates the instance.
|
|
||||||
fn handle_term(&mut self, sender_id: &NodeUid, b: bool) -> Result<Step<NodeUid>> {
|
|
||||||
self.received_term[b].insert(sender_id.clone());
|
|
||||||
// Check for the expedite termination condition.
|
|
||||||
if self.decision.is_some() {
|
|
||||||
Ok(Step::default())
|
|
||||||
} else if self.received_term[b].len() > self.netinfo.num_faulty() {
|
|
||||||
Ok(self.decide(b))
|
|
||||||
} else {
|
|
||||||
// Otherwise handle the `Term` as a `BVal`, `Aux` and `Conf`.
|
|
||||||
let mut sbvb_step = self.sbv_broadcast.handle_bval(sender_id, b)?;
|
|
||||||
sbvb_step.extend(self.sbv_broadcast.handle_aux(sender_id, b)?);
|
|
||||||
let mut step = self.handle_sbvb_step(sbvb_step)?;
|
|
||||||
step.extend(self.handle_conf(sender_id, BoolSet::from(b))?);
|
|
||||||
Ok(step)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Handles a Common Coin message. If there is output from Common Coin, starts the next
|
|
||||||
/// epoch. The function may output a decision value.
|
|
||||||
fn handle_coin(
|
|
||||||
&mut self,
|
|
||||||
sender_id: &NodeUid,
|
|
||||||
msg: CommonCoinMessage,
|
|
||||||
) -> Result<Step<NodeUid>> {
|
|
||||||
let coin_step = match self.coin_state {
|
|
||||||
CoinState::Decided(_) => return Ok(Step::default()), // Coin value is already decided.
|
|
||||||
CoinState::InProgress(ref mut common_coin) => common_coin
|
|
||||||
.handle_message(sender_id, msg)
|
|
||||||
.map_err(Error::HandleCoinCommonCoin)?,
|
|
||||||
};
|
|
||||||
self.on_coin_step(coin_step)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Multicasts a `Conf(values)` message, and handles it.
|
|
||||||
fn send_conf(&mut self, values: BoolSet) -> Result<Step<NodeUid>> {
|
|
||||||
if self.conf_values.is_some() {
|
|
||||||
// Only one `Conf` message is allowed in an epoch.
|
|
||||||
return Ok(Step::default());
|
|
||||||
}
|
|
||||||
|
|
||||||
// Trigger the start of the `Conf` round.
|
|
||||||
self.conf_values = Some(values);
|
|
||||||
|
|
||||||
if !self.netinfo.is_validator() {
|
|
||||||
return Ok(self.try_finish_conf_round()?);
|
|
||||||
}
|
|
||||||
|
|
||||||
self.send(AgreementContent::Conf(values))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Multicasts and handles a message. Does nothing if we are only an observer.
|
|
||||||
fn send(&mut self, content: AgreementContent) -> Result<Step<NodeUid>> {
|
|
||||||
if !self.netinfo.is_validator() {
|
|
||||||
return Ok(Step::default());
|
|
||||||
}
|
|
||||||
let mut step: Step<_> = Target::All
|
|
||||||
.message(content.clone().with_epoch(self.epoch))
|
|
||||||
.into();
|
|
||||||
let our_uid = &self.netinfo.our_uid().clone();
|
|
||||||
step.extend(self.handle_message_content(our_uid, content)?);
|
|
||||||
Ok(step)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Handles a step returned from the `CommonCoin`.
|
|
||||||
fn on_coin_step(
|
|
||||||
&mut self,
|
|
||||||
coin_step: common_coin::Step<NodeUid, Nonce>,
|
|
||||||
) -> Result<Step<NodeUid>> {
|
|
||||||
let mut step = Step::default();
|
|
||||||
let epoch = self.epoch;
|
|
||||||
let to_msg = |c_msg| AgreementContent::Coin(Box::new(c_msg)).with_epoch(epoch);
|
|
||||||
let coin_output = step.extend_with(coin_step, to_msg);
|
|
||||||
if let Some(coin) = coin_output.into_iter().next() {
|
|
||||||
self.coin_state = coin.into();
|
|
||||||
step.extend(self.try_update_epoch()?);
|
|
||||||
}
|
|
||||||
Ok(step)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// If this epoch's coin value or conf values are not known yet, does nothing, otherwise
|
|
||||||
/// updates the epoch or decides.
|
|
||||||
///
|
|
||||||
/// With two conf values, the next epoch's estimate is the coin value. If there is only one conf
|
|
||||||
/// value and that disagrees with the coin, the conf value is the next epoch's estimate. If
|
|
||||||
/// the unique conf value agrees with the coin, terminates and decides on that value.
|
|
||||||
fn try_update_epoch(&mut self) -> Result<Step<NodeUid>> {
|
|
||||||
if self.decision.is_some() {
|
|
||||||
// Avoid an infinite regression without making an Agreement step.
|
|
||||||
return Ok(Step::default());
|
|
||||||
}
|
|
||||||
let coin = match self.coin_state.value() {
|
|
||||||
None => return Ok(Step::default()), // Still waiting for coin value.
|
|
||||||
Some(coin) => coin,
|
|
||||||
};
|
|
||||||
let def_bin_value = match self.conf_values {
|
|
||||||
None => return Ok(Step::default()), // Still waiting for conf value.
|
|
||||||
Some(ref values) => values.definite(),
|
|
||||||
};
|
|
||||||
|
|
||||||
if Some(coin) == def_bin_value {
|
|
||||||
Ok(self.decide(coin))
|
|
||||||
} else {
|
|
||||||
self.update_epoch(def_bin_value.unwrap_or(coin))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Creates the initial coin state for the current epoch, i.e. sets it to the predetermined
|
|
||||||
/// value, or initializes a `CommonCoin` instance.
|
|
||||||
fn coin_state(&self) -> CoinState<NodeUid> {
|
|
||||||
match self.epoch % 3 {
|
|
||||||
0 => CoinState::Decided(true),
|
|
||||||
1 => CoinState::Decided(false),
|
|
||||||
_ => {
|
|
||||||
let nonce = Nonce::new(
|
|
||||||
self.netinfo.invocation_id().as_ref(),
|
|
||||||
self.session_id,
|
|
||||||
self.netinfo.node_index(&self.proposer_id).unwrap(),
|
|
||||||
self.epoch,
|
|
||||||
);
|
|
||||||
CoinState::InProgress(CommonCoin::new(self.netinfo.clone(), nonce))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Decides on a value and broadcasts a `Term` message with that value.
|
|
||||||
fn decide(&mut self, b: bool) -> Step<NodeUid> {
|
|
||||||
if self.decision.is_some() {
|
|
||||||
return Step::default();
|
|
||||||
}
|
|
||||||
// Output the agreement value.
|
|
||||||
let mut step = Step::default();
|
|
||||||
step.output.push_back(b);
|
|
||||||
// Latch the decided state.
|
|
||||||
self.decision = Some(b);
|
|
||||||
debug!(
|
|
||||||
"{:?}/{:?} (is_validator: {}) decision: {}",
|
|
||||||
self.netinfo.our_uid(),
|
|
||||||
self.proposer_id,
|
|
||||||
self.netinfo.is_validator(),
|
|
||||||
b
|
|
||||||
);
|
|
||||||
if self.netinfo.is_validator() {
|
|
||||||
let msg = AgreementContent::Term(b).with_epoch(self.epoch + 1);
|
|
||||||
step.messages.push_back(Target::All.message(msg));
|
|
||||||
}
|
|
||||||
step
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Checks whether the _N - f_ `Conf` messages have arrived, and if so, activates the coin.
|
|
||||||
fn try_finish_conf_round(&mut self) -> Result<Step<NodeUid>> {
|
|
||||||
if self.conf_values.is_none() || self.count_conf() < self.netinfo.num_correct() {
|
|
||||||
return Ok(Step::default());
|
|
||||||
}
|
|
||||||
|
|
||||||
// Invoke the common coin.
|
|
||||||
let coin_step = match self.coin_state {
|
|
||||||
CoinState::Decided(_) => return Ok(Step::default()), // Coin has already decided.
|
|
||||||
CoinState::InProgress(ref mut common_coin) => common_coin
|
|
||||||
.input(())
|
|
||||||
.map_err(Error::TryFinishConfRoundCommonCoin)?,
|
|
||||||
};
|
|
||||||
let mut step = self.on_coin_step(coin_step)?;
|
|
||||||
step.extend(self.try_update_epoch()?);
|
|
||||||
Ok(step)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Counts the number of received `Conf` messages with values in `bin_values`.
|
|
||||||
fn count_conf(&self) -> usize {
|
|
||||||
let is_bin_val = |conf: &&BoolSet| conf.is_subset(self.sbv_broadcast.bin_values());
|
|
||||||
self.received_conf.values().filter(is_bin_val).count()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Increments the epoch, sets the new estimate and handles queued messages.
|
|
||||||
fn update_epoch(&mut self, b: bool) -> Result<Step<NodeUid>> {
|
|
||||||
self.sbv_broadcast.clear(&self.received_term);
|
|
||||||
self.received_conf.clear();
|
|
||||||
for (v, id) in &self.received_term {
|
|
||||||
self.received_conf.insert(id.clone(), BoolSet::from(v));
|
|
||||||
}
|
|
||||||
self.conf_values = None;
|
|
||||||
self.epoch += 1;
|
|
||||||
self.coin_state = self.coin_state();
|
|
||||||
debug!(
|
|
||||||
"{:?} Agreement instance {:?} started epoch {}, {} terminated",
|
|
||||||
self.netinfo.our_uid(),
|
|
||||||
self.proposer_id,
|
|
||||||
self.epoch,
|
|
||||||
self.received_conf.len(),
|
|
||||||
);
|
|
||||||
|
|
||||||
self.estimated = Some(b);
|
|
||||||
let sbvb_step = self.sbv_broadcast.input(b)?;
|
|
||||||
let mut step = self.handle_sbvb_step(sbvb_step)?;
|
|
||||||
let queued_msgs = Itertools::flatten(self.incoming_queue.remove(&self.epoch).into_iter());
|
|
||||||
for (sender_id, content) in queued_msgs {
|
|
||||||
step.extend(self.handle_message_content(&sender_id, content)?);
|
|
||||||
if self.decision.is_some() {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(step)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
struct Nonce(Vec<u8>);
|
struct Nonce(Vec<u8>);
|
||||||
|
|
||||||
|
|
|
@ -541,8 +541,7 @@ impl<NodeUid: Debug + Clone + Ord> Broadcast<NodeUid> {
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
})
|
}).collect();
|
||||||
.collect();
|
|
||||||
if let Some(value) =
|
if let Some(value) =
|
||||||
decode_from_shards(&mut leaf_values, &self.coding, self.data_shard_num, hash)
|
decode_from_shards(&mut leaf_values, &self.coding, self.data_shard_num, hash)
|
||||||
{
|
{
|
||||||
|
|
|
@ -129,7 +129,8 @@ impl<NodeUid: Clone + Debug + Ord + Rand> CommonSubset<NodeUid> {
|
||||||
for proposer_id in netinfo.all_uids() {
|
for proposer_id in netinfo.all_uids() {
|
||||||
broadcast_instances.insert(
|
broadcast_instances.insert(
|
||||||
proposer_id.clone(),
|
proposer_id.clone(),
|
||||||
Broadcast::new(netinfo.clone(), proposer_id.clone()).map_err(Error::NewBroadcast)?,
|
Broadcast::new(netinfo.clone(), proposer_id.clone())
|
||||||
|
.map_err(Error::NewBroadcast)?,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,419 @@
|
||||||
|
use rand::Rand;
|
||||||
|
use std::fmt::Debug;
|
||||||
|
use std::hash::Hash;
|
||||||
|
use std::mem;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use bincode;
|
||||||
|
use crypto::Signature;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
use super::votes::{SignedVote, VoteCounter};
|
||||||
|
use super::{
|
||||||
|
Batch, Change, ChangeState, DynamicHoneyBadgerBuilder, Error, ErrorKind, Input,
|
||||||
|
InternalContrib, KeyGenMessage, KeyGenState, Message, Result, SignedKeyGenMsg, Step,
|
||||||
|
};
|
||||||
|
use fault_log::{Fault, FaultKind, FaultLog};
|
||||||
|
use honey_badger::{self, HoneyBadger, Message as HbMessage};
|
||||||
|
use messaging::{DistAlgorithm, NetworkInfo, Target};
|
||||||
|
use sync_key_gen::{Ack, Part, PartOutcome, SyncKeyGen};
|
||||||
|
|
||||||
|
/// A Honey Badger instance that can handle adding and removing nodes.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct DynamicHoneyBadger<C, NodeUid: Rand> {
|
||||||
|
/// Shared network data.
|
||||||
|
pub(super) netinfo: NetworkInfo<NodeUid>,
|
||||||
|
/// The maximum number of future epochs for which we handle messages simultaneously.
|
||||||
|
pub(super) max_future_epochs: usize,
|
||||||
|
/// The first epoch after the latest node change.
|
||||||
|
pub(super) start_epoch: u64,
|
||||||
|
/// The buffer and counter for the pending and committed change votes.
|
||||||
|
pub(super) vote_counter: VoteCounter<NodeUid>,
|
||||||
|
/// Pending node transactions that we will propose in the next epoch.
|
||||||
|
pub(super) key_gen_msg_buffer: Vec<SignedKeyGenMsg<NodeUid>>,
|
||||||
|
/// The `HoneyBadger` instance with the current set of nodes.
|
||||||
|
pub(super) honey_badger: HoneyBadger<InternalContrib<C, NodeUid>, NodeUid>,
|
||||||
|
/// The current key generation process, and the change it applies to.
|
||||||
|
pub(super) key_gen_state: Option<KeyGenState<NodeUid>>,
|
||||||
|
/// A queue for messages from future epochs that cannot be handled yet.
|
||||||
|
pub(super) incoming_queue: Vec<(NodeUid, Message<NodeUid>)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<C, NodeUid> DistAlgorithm for DynamicHoneyBadger<C, NodeUid>
|
||||||
|
where
|
||||||
|
C: Eq + Serialize + for<'r> Deserialize<'r> + Debug + Hash,
|
||||||
|
NodeUid: Eq + Ord + Clone + Serialize + for<'r> Deserialize<'r> + Debug + Hash + Rand,
|
||||||
|
{
|
||||||
|
type NodeUid = NodeUid;
|
||||||
|
type Input = Input<C, NodeUid>;
|
||||||
|
type Output = Batch<C, NodeUid>;
|
||||||
|
type Message = Message<NodeUid>;
|
||||||
|
type Error = Error;
|
||||||
|
|
||||||
|
fn input(&mut self, input: Self::Input) -> Result<Step<C, NodeUid>> {
|
||||||
|
// User contributions are forwarded to `HoneyBadger` right away. Votes are signed and
|
||||||
|
// broadcast.
|
||||||
|
match input {
|
||||||
|
Input::User(contrib) => self.propose(contrib),
|
||||||
|
Input::Change(change) => self.vote_for(change),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_message(
|
||||||
|
&mut self,
|
||||||
|
sender_id: &NodeUid,
|
||||||
|
message: Self::Message,
|
||||||
|
) -> Result<Step<C, NodeUid>> {
|
||||||
|
let epoch = message.start_epoch();
|
||||||
|
if epoch < self.start_epoch {
|
||||||
|
// Obsolete message.
|
||||||
|
Ok(Step::default())
|
||||||
|
} else if epoch > self.start_epoch {
|
||||||
|
// Message cannot be handled yet. Save it for later.
|
||||||
|
let entry = (sender_id.clone(), message);
|
||||||
|
self.incoming_queue.push(entry);
|
||||||
|
Ok(Step::default())
|
||||||
|
} else {
|
||||||
|
match message {
|
||||||
|
Message::HoneyBadger(_, hb_msg) => {
|
||||||
|
self.handle_honey_badger_message(sender_id, hb_msg)
|
||||||
|
}
|
||||||
|
Message::KeyGen(_, kg_msg, sig) => self
|
||||||
|
.handle_key_gen_message(sender_id, kg_msg, *sig)
|
||||||
|
.map(FaultLog::into),
|
||||||
|
Message::SignedVote(signed_vote) => self
|
||||||
|
.vote_counter
|
||||||
|
.add_pending_vote(sender_id, signed_vote)
|
||||||
|
.map(FaultLog::into),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn terminated(&self) -> bool {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
|
||||||
|
fn our_id(&self) -> &NodeUid {
|
||||||
|
self.netinfo.our_uid()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<C, NodeUid> DynamicHoneyBadger<C, NodeUid>
|
||||||
|
where
|
||||||
|
C: Eq + Serialize + for<'r> Deserialize<'r> + Debug + Hash,
|
||||||
|
NodeUid: Eq + Ord + Clone + Debug + Serialize + for<'r> Deserialize<'r> + Hash + Rand,
|
||||||
|
{
|
||||||
|
/// Returns a new `DynamicHoneyBadgerBuilder`.
|
||||||
|
pub fn builder() -> DynamicHoneyBadgerBuilder<C, NodeUid> {
|
||||||
|
DynamicHoneyBadgerBuilder::new()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns `true` if input for the current epoch has already been provided.
|
||||||
|
pub fn has_input(&self) -> bool {
|
||||||
|
self.honey_badger.has_input()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Proposes a contribution in the current epoch.
|
||||||
|
pub fn propose(&mut self, contrib: C) -> Result<Step<C, NodeUid>> {
|
||||||
|
let step = self
|
||||||
|
.honey_badger
|
||||||
|
.input(InternalContrib {
|
||||||
|
contrib,
|
||||||
|
key_gen_messages: self.key_gen_msg_buffer.clone(),
|
||||||
|
votes: self.vote_counter.pending_votes().cloned().collect(),
|
||||||
|
}).map_err(ErrorKind::ProposeHoneyBadger)?;
|
||||||
|
self.process_output(step)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Cast a vote to change the set of validators.
|
||||||
|
pub fn vote_for(&mut self, change: Change<NodeUid>) -> Result<Step<C, NodeUid>> {
|
||||||
|
if !self.netinfo.is_validator() {
|
||||||
|
return Ok(Step::default()); // TODO: Return an error?
|
||||||
|
}
|
||||||
|
let signed_vote = self.vote_counter.sign_vote_for(change)?.clone();
|
||||||
|
let msg = Message::SignedVote(signed_vote);
|
||||||
|
Ok(Target::All.message(msg).into())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the information about the node IDs in the network, and the cryptographic keys.
|
||||||
|
pub fn netinfo(&self) -> &NetworkInfo<NodeUid> {
|
||||||
|
&self.netinfo
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns `true` if we should make our contribution for the next epoch, even if we don't have
|
||||||
|
/// content ourselves, to avoid stalling the network.
|
||||||
|
///
|
||||||
|
/// By proposing only if this returns `true`, you can prevent an adversary from making the
|
||||||
|
/// network output empty baches indefinitely, but it also means that the network won't advance
|
||||||
|
/// if fewer than _f + 1_ nodes have pending contributions.
|
||||||
|
pub fn should_propose(&self) -> bool {
|
||||||
|
if self.has_input() {
|
||||||
|
return false; // We have already proposed.
|
||||||
|
}
|
||||||
|
if self.honey_badger.received_proposals() > self.netinfo.num_faulty() {
|
||||||
|
return true; // At least one correct node wants to move on to the next epoch.
|
||||||
|
}
|
||||||
|
let is_our_vote = |signed_vote: &SignedVote<_>| signed_vote.voter() == self.our_id();
|
||||||
|
if self.vote_counter.pending_votes().any(is_our_vote) {
|
||||||
|
return true; // We have pending input to vote for a validator change.
|
||||||
|
}
|
||||||
|
let kgs = match self.key_gen_state {
|
||||||
|
None => return false, // No ongoing key generation.
|
||||||
|
Some(ref kgs) => kgs,
|
||||||
|
};
|
||||||
|
// If either we or the candidate have a pending key gen message, we should propose.
|
||||||
|
let ours_or_candidates = |msg: &SignedKeyGenMsg<_>| {
|
||||||
|
msg.1 == *self.our_id() || Some(&msg.1) == kgs.change.candidate()
|
||||||
|
};
|
||||||
|
self.key_gen_msg_buffer.iter().any(ours_or_candidates)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Handles a message for the `HoneyBadger` instance.
|
||||||
|
fn handle_honey_badger_message(
|
||||||
|
&mut self,
|
||||||
|
sender_id: &NodeUid,
|
||||||
|
message: HbMessage<NodeUid>,
|
||||||
|
) -> Result<Step<C, NodeUid>> {
|
||||||
|
if !self.netinfo.is_node_validator(sender_id) {
|
||||||
|
info!("Unknown sender {:?} of message {:?}", sender_id, message);
|
||||||
|
return Err(ErrorKind::UnknownSender.into());
|
||||||
|
}
|
||||||
|
// Handle the message.
|
||||||
|
let step = self
|
||||||
|
.honey_badger
|
||||||
|
.handle_message(sender_id, message)
|
||||||
|
.map_err(ErrorKind::HandleHoneyBadgerMessageHoneyBadger)?;
|
||||||
|
self.process_output(step)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Handles a vote or key generation message and tries to commit it as a transaction. These
|
||||||
|
/// messages are only handled once they appear in a batch output from Honey Badger.
|
||||||
|
fn handle_key_gen_message(
|
||||||
|
&mut self,
|
||||||
|
sender_id: &NodeUid,
|
||||||
|
kg_msg: KeyGenMessage,
|
||||||
|
sig: Signature,
|
||||||
|
) -> Result<FaultLog<NodeUid>> {
|
||||||
|
if !self.verify_signature(sender_id, &sig, &kg_msg)? {
|
||||||
|
info!("Invalid signature from {:?} for: {:?}.", sender_id, kg_msg);
|
||||||
|
let fault_kind = FaultKind::InvalidKeyGenMessageSignature;
|
||||||
|
return Ok(Fault::new(sender_id.clone(), fault_kind).into());
|
||||||
|
}
|
||||||
|
let kgs = match self.key_gen_state {
|
||||||
|
Some(ref mut kgs) => kgs,
|
||||||
|
None => {
|
||||||
|
info!(
|
||||||
|
"Unexpected key gen message from {:?}: {:?}.",
|
||||||
|
sender_id, kg_msg
|
||||||
|
);
|
||||||
|
return Ok(Fault::new(sender_id.clone(), FaultKind::UnexpectedKeyGenMessage).into());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// If the joining node is correct, it will send at most (N + 1)² + 1 key generation
|
||||||
|
// messages.
|
||||||
|
if Some(sender_id) == kgs.change.candidate() {
|
||||||
|
let n = self.netinfo.num_nodes() + 1;
|
||||||
|
if kgs.candidate_msg_count > n * n {
|
||||||
|
info!(
|
||||||
|
"Too many key gen messages from candidate {:?}: {:?}.",
|
||||||
|
sender_id, kg_msg
|
||||||
|
);
|
||||||
|
let fault_kind = FaultKind::TooManyCandidateKeyGenMessages;
|
||||||
|
return Ok(Fault::new(sender_id.clone(), fault_kind).into());
|
||||||
|
}
|
||||||
|
kgs.candidate_msg_count += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
let tx = SignedKeyGenMsg(self.start_epoch, sender_id.clone(), kg_msg, sig);
|
||||||
|
self.key_gen_msg_buffer.push(tx);
|
||||||
|
Ok(FaultLog::default())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Processes all pending batches output by Honey Badger.
|
||||||
|
fn process_output(
|
||||||
|
&mut self,
|
||||||
|
hb_step: honey_badger::Step<InternalContrib<C, NodeUid>, NodeUid>,
|
||||||
|
) -> Result<Step<C, NodeUid>> {
|
||||||
|
let mut step: Step<C, NodeUid> = Step::default();
|
||||||
|
let start_epoch = self.start_epoch;
|
||||||
|
let output = step.extend_with(hb_step, |hb_msg| Message::HoneyBadger(start_epoch, hb_msg));
|
||||||
|
for hb_batch in output {
|
||||||
|
// Create the batch we output ourselves. It will contain the _user_ transactions of
|
||||||
|
// `hb_batch`, and the current change state.
|
||||||
|
let mut batch = Batch::new(hb_batch.epoch + self.start_epoch);
|
||||||
|
|
||||||
|
// Add the user transactions to `batch` and handle votes and DKG messages.
|
||||||
|
for (id, int_contrib) in hb_batch.contributions {
|
||||||
|
let InternalContrib {
|
||||||
|
votes,
|
||||||
|
key_gen_messages,
|
||||||
|
contrib,
|
||||||
|
} = int_contrib;
|
||||||
|
step.fault_log
|
||||||
|
.extend(self.vote_counter.add_committed_votes(&id, votes)?);
|
||||||
|
batch.contributions.insert(id.clone(), contrib);
|
||||||
|
self.key_gen_msg_buffer
|
||||||
|
.retain(|skgm| !key_gen_messages.contains(skgm));
|
||||||
|
for SignedKeyGenMsg(epoch, s_id, kg_msg, sig) in key_gen_messages {
|
||||||
|
if epoch < self.start_epoch {
|
||||||
|
info!("Obsolete key generation message: {:?}.", kg_msg);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if !self.verify_signature(&s_id, &sig, &kg_msg)? {
|
||||||
|
info!(
|
||||||
|
"Invalid signature in {:?}'s batch from {:?} for: {:?}.",
|
||||||
|
id, s_id, kg_msg
|
||||||
|
);
|
||||||
|
let fault_kind = FaultKind::InvalidKeyGenMessageSignature;
|
||||||
|
step.fault_log.append(id.clone(), fault_kind);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
step.extend(match kg_msg {
|
||||||
|
KeyGenMessage::Part(part) => self.handle_part(&s_id, part)?,
|
||||||
|
KeyGenMessage::Ack(ack) => self.handle_ack(&s_id, ack)?.into(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(kgs) = self.take_ready_key_gen() {
|
||||||
|
// If DKG completed, apply the change, restart Honey Badger, and inform the user.
|
||||||
|
debug!("{:?} DKG for {:?} complete!", self.our_id(), kgs.change);
|
||||||
|
self.netinfo = kgs.key_gen.into_network_info();
|
||||||
|
self.restart_honey_badger(batch.epoch + 1);
|
||||||
|
batch.set_change(ChangeState::Complete(kgs.change), &self.netinfo);
|
||||||
|
} else if let Some(change) = self.vote_counter.compute_winner().cloned() {
|
||||||
|
// If there is a new change, restart DKG. Inform the user about the current change.
|
||||||
|
step.extend(self.update_key_gen(batch.epoch + 1, &change)?);
|
||||||
|
batch.set_change(ChangeState::InProgress(change), &self.netinfo);
|
||||||
|
}
|
||||||
|
step.output.push_back(batch);
|
||||||
|
}
|
||||||
|
// If `start_epoch` changed, we can now handle some queued messages.
|
||||||
|
if start_epoch < self.start_epoch {
|
||||||
|
let queue = mem::replace(&mut self.incoming_queue, Vec::new());
|
||||||
|
for (sender_id, msg) in queue {
|
||||||
|
step.extend(self.handle_message(&sender_id, msg)?);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(step)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// If the winner of the vote has changed, restarts Key Generation for the set of nodes implied
|
||||||
|
/// by the current change.
|
||||||
|
pub(super) fn update_key_gen(
|
||||||
|
&mut self,
|
||||||
|
epoch: u64,
|
||||||
|
change: &Change<NodeUid>,
|
||||||
|
) -> Result<Step<C, NodeUid>> {
|
||||||
|
if self.key_gen_state.as_ref().map(|kgs| &kgs.change) == Some(change) {
|
||||||
|
return Ok(Step::default()); // The change is the same as before. Continue DKG as is.
|
||||||
|
}
|
||||||
|
debug!("{:?} Restarting DKG for {:?}.", self.our_id(), change);
|
||||||
|
// Use the existing key shares - with the change applied - as keys for DKG.
|
||||||
|
let mut pub_keys = self.netinfo.public_key_map().clone();
|
||||||
|
if match *change {
|
||||||
|
Change::Remove(ref id) => pub_keys.remove(id).is_none(),
|
||||||
|
Change::Add(ref id, ref pk) => pub_keys.insert(id.clone(), pk.clone()).is_some(),
|
||||||
|
} {
|
||||||
|
info!("{:?} No-op change: {:?}", self.our_id(), change);
|
||||||
|
}
|
||||||
|
self.restart_honey_badger(epoch);
|
||||||
|
// TODO: This needs to be the same as `num_faulty` will be in the _new_
|
||||||
|
// `NetworkInfo` if the change goes through. It would be safer to deduplicate.
|
||||||
|
let threshold = (pub_keys.len() - 1) / 3;
|
||||||
|
let sk = self.netinfo.secret_key().clone();
|
||||||
|
let our_uid = self.our_id().clone();
|
||||||
|
let (key_gen, part) = SyncKeyGen::new(our_uid, sk, pub_keys, threshold);
|
||||||
|
self.key_gen_state = Some(KeyGenState::new(key_gen, change.clone()));
|
||||||
|
if let Some(part) = part {
|
||||||
|
self.send_transaction(KeyGenMessage::Part(part))
|
||||||
|
} else {
|
||||||
|
Ok(Step::default())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Starts a new `HoneyBadger` instance and resets the vote counter.
|
||||||
|
fn restart_honey_badger(&mut self, epoch: u64) {
|
||||||
|
self.start_epoch = epoch;
|
||||||
|
self.key_gen_msg_buffer.retain(|kg_msg| kg_msg.0 >= epoch);
|
||||||
|
let netinfo = Arc::new(self.netinfo.clone());
|
||||||
|
let counter = VoteCounter::new(netinfo.clone(), epoch);
|
||||||
|
mem::replace(&mut self.vote_counter, counter);
|
||||||
|
self.honey_badger = HoneyBadger::builder(netinfo)
|
||||||
|
.max_future_epochs(self.max_future_epochs)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Handles a `Part` message that was output by Honey Badger.
|
||||||
|
fn handle_part(&mut self, sender_id: &NodeUid, part: Part) -> Result<Step<C, NodeUid>> {
|
||||||
|
let handle = |kgs: &mut KeyGenState<NodeUid>| kgs.key_gen.handle_part(&sender_id, part);
|
||||||
|
match self.key_gen_state.as_mut().and_then(handle) {
|
||||||
|
Some(PartOutcome::Valid(ack)) => self.send_transaction(KeyGenMessage::Ack(ack)),
|
||||||
|
Some(PartOutcome::Invalid(fault_log)) => Ok(fault_log.into()),
|
||||||
|
None => Ok(Step::default()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Handles an `Ack` message that was output by Honey Badger.
|
||||||
|
fn handle_ack(&mut self, sender_id: &NodeUid, ack: Ack) -> Result<FaultLog<NodeUid>> {
|
||||||
|
if let Some(kgs) = self.key_gen_state.as_mut() {
|
||||||
|
Ok(kgs.key_gen.handle_ack(sender_id, ack))
|
||||||
|
} else {
|
||||||
|
Ok(FaultLog::new())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Signs and sends a `KeyGenMessage` and also tries to commit it.
|
||||||
|
fn send_transaction(&mut self, kg_msg: KeyGenMessage) -> Result<Step<C, NodeUid>> {
|
||||||
|
let ser =
|
||||||
|
bincode::serialize(&kg_msg).map_err(|err| ErrorKind::SendTransactionBincode(*err))?;
|
||||||
|
let sig = Box::new(self.netinfo.secret_key().sign(ser));
|
||||||
|
if self.netinfo.is_validator() {
|
||||||
|
let our_uid = self.netinfo.our_uid().clone();
|
||||||
|
let signed_msg =
|
||||||
|
SignedKeyGenMsg(self.start_epoch, our_uid, kg_msg.clone(), *sig.clone());
|
||||||
|
self.key_gen_msg_buffer.push(signed_msg);
|
||||||
|
}
|
||||||
|
let msg = Message::KeyGen(self.start_epoch, kg_msg, sig);
|
||||||
|
Ok(Target::All.message(msg).into())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// If the current Key Generation process is ready, returns the `KeyGenState`.
|
||||||
|
///
|
||||||
|
/// We require the minimum number of completed proposals (`SyncKeyGen::is_ready`) and if a new
|
||||||
|
/// node is joining, we require in addition that the new node's proposal is complete. That way
|
||||||
|
/// the new node knows that it's key is secret, without having to trust any number of nodes.
|
||||||
|
fn take_ready_key_gen(&mut self) -> Option<KeyGenState<NodeUid>> {
|
||||||
|
if self
|
||||||
|
.key_gen_state
|
||||||
|
.as_ref()
|
||||||
|
.map_or(false, KeyGenState::is_ready)
|
||||||
|
{
|
||||||
|
self.key_gen_state.take()
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns `true` if the signature of `kg_msg` by the node with the specified ID is valid.
|
||||||
|
/// Returns an error if the payload fails to serialize.
|
||||||
|
///
|
||||||
|
/// This accepts signatures from both validators and the currently joining candidate, if any.
|
||||||
|
fn verify_signature(
|
||||||
|
&self,
|
||||||
|
node_id: &NodeUid,
|
||||||
|
sig: &Signature,
|
||||||
|
kg_msg: &KeyGenMessage,
|
||||||
|
) -> Result<bool> {
|
||||||
|
let ser =
|
||||||
|
bincode::serialize(kg_msg).map_err(|err| ErrorKind::VerifySignatureBincode(*err))?;
|
||||||
|
let get_candidate_key = || {
|
||||||
|
self.key_gen_state
|
||||||
|
.as_ref()
|
||||||
|
.and_then(|kgs| kgs.candidate_key(node_id))
|
||||||
|
};
|
||||||
|
let pk_opt = self.netinfo.public_key(node_id).or_else(get_candidate_key);
|
||||||
|
Ok(pk_opt.map_or(false, |pk| pk.verify(&sig, ser)))
|
||||||
|
}
|
||||||
|
}
|
|
@ -55,33 +55,30 @@
|
||||||
//! and replaced by a new one with the new set of participants. If a different change wins a
|
//! and replaced by a new one with the new set of participants. If a different change wins a
|
||||||
//! vote before that happens, key generation resets again, and is attempted for the new change.
|
//! vote before that happens, key generation resets again, and is attempted for the new change.
|
||||||
|
|
||||||
|
mod batch;
|
||||||
|
mod builder;
|
||||||
|
mod change;
|
||||||
|
mod dynamic_honey_badger;
|
||||||
|
mod error;
|
||||||
|
mod votes;
|
||||||
|
|
||||||
|
use crypto::{PublicKey, PublicKeySet, Signature};
|
||||||
use rand::Rand;
|
use rand::Rand;
|
||||||
use std::collections::BTreeMap;
|
use std::collections::BTreeMap;
|
||||||
use std::fmt::Debug;
|
use std::fmt::Debug;
|
||||||
use std::hash::Hash;
|
|
||||||
use std::mem;
|
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
use bincode;
|
|
||||||
use crypto::{PublicKey, PublicKeySet, Signature};
|
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
|
|
||||||
use self::votes::{SignedVote, VoteCounter};
|
use self::votes::{SignedVote, VoteCounter};
|
||||||
use fault_log::{Fault, FaultKind, FaultLog};
|
use honey_badger::Message as HbMessage;
|
||||||
use honey_badger::{self, HoneyBadger, Message as HbMessage};
|
use messaging;
|
||||||
use messaging::{self, DistAlgorithm, NetworkInfo, Target};
|
use sync_key_gen::{Ack, Part, SyncKeyGen};
|
||||||
use sync_key_gen::{Ack, Part, PartOutcome, SyncKeyGen};
|
|
||||||
|
|
||||||
pub use self::batch::Batch;
|
pub use self::batch::Batch;
|
||||||
pub use self::builder::DynamicHoneyBadgerBuilder;
|
pub use self::builder::DynamicHoneyBadgerBuilder;
|
||||||
pub use self::change::{Change, ChangeState};
|
pub use self::change::{Change, ChangeState};
|
||||||
|
pub use self::dynamic_honey_badger::DynamicHoneyBadger;
|
||||||
pub use self::error::{Error, ErrorKind, Result};
|
pub use self::error::{Error, ErrorKind, Result};
|
||||||
|
|
||||||
mod batch;
|
pub type Step<C, NodeUid> = messaging::Step<DynamicHoneyBadger<C, NodeUid>>;
|
||||||
mod builder;
|
|
||||||
mod change;
|
|
||||||
mod error;
|
|
||||||
mod votes;
|
|
||||||
|
|
||||||
/// The user input for `DynamicHoneyBadger`.
|
/// The user input for `DynamicHoneyBadger`.
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
|
@ -92,421 +89,6 @@ pub enum Input<C, NodeUid> {
|
||||||
Change(Change<NodeUid>),
|
Change(Change<NodeUid>),
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A Honey Badger instance that can handle adding and removing nodes.
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct DynamicHoneyBadger<C, NodeUid: Rand> {
|
|
||||||
/// Shared network data.
|
|
||||||
netinfo: NetworkInfo<NodeUid>,
|
|
||||||
/// The maximum number of future epochs for which we handle messages simultaneously.
|
|
||||||
max_future_epochs: usize,
|
|
||||||
/// The first epoch after the latest node change.
|
|
||||||
start_epoch: u64,
|
|
||||||
/// The buffer and counter for the pending and committed change votes.
|
|
||||||
vote_counter: VoteCounter<NodeUid>,
|
|
||||||
/// Pending node transactions that we will propose in the next epoch.
|
|
||||||
key_gen_msg_buffer: Vec<SignedKeyGenMsg<NodeUid>>,
|
|
||||||
/// The `HoneyBadger` instance with the current set of nodes.
|
|
||||||
honey_badger: HoneyBadger<InternalContrib<C, NodeUid>, NodeUid>,
|
|
||||||
/// The current key generation process, and the change it applies to.
|
|
||||||
key_gen_state: Option<KeyGenState<NodeUid>>,
|
|
||||||
/// A queue for messages from future epochs that cannot be handled yet.
|
|
||||||
incoming_queue: Vec<(NodeUid, Message<NodeUid>)>,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub type Step<C, NodeUid> = messaging::Step<DynamicHoneyBadger<C, NodeUid>>;
|
|
||||||
|
|
||||||
impl<C, NodeUid> DistAlgorithm for DynamicHoneyBadger<C, NodeUid>
|
|
||||||
where
|
|
||||||
C: Eq + Serialize + for<'r> Deserialize<'r> + Debug + Hash,
|
|
||||||
NodeUid: Eq + Ord + Clone + Serialize + for<'r> Deserialize<'r> + Debug + Hash + Rand,
|
|
||||||
{
|
|
||||||
type NodeUid = NodeUid;
|
|
||||||
type Input = Input<C, NodeUid>;
|
|
||||||
type Output = Batch<C, NodeUid>;
|
|
||||||
type Message = Message<NodeUid>;
|
|
||||||
type Error = Error;
|
|
||||||
|
|
||||||
fn input(&mut self, input: Self::Input) -> Result<Step<C, NodeUid>> {
|
|
||||||
// User contributions are forwarded to `HoneyBadger` right away. Votes are signed and
|
|
||||||
// broadcast.
|
|
||||||
match input {
|
|
||||||
Input::User(contrib) => self.propose(contrib),
|
|
||||||
Input::Change(change) => self.vote_for(change),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn handle_message(
|
|
||||||
&mut self,
|
|
||||||
sender_id: &NodeUid,
|
|
||||||
message: Self::Message,
|
|
||||||
) -> Result<Step<C, NodeUid>> {
|
|
||||||
let epoch = message.start_epoch();
|
|
||||||
if epoch < self.start_epoch {
|
|
||||||
// Obsolete message.
|
|
||||||
Ok(Step::default())
|
|
||||||
} else if epoch > self.start_epoch {
|
|
||||||
// Message cannot be handled yet. Save it for later.
|
|
||||||
let entry = (sender_id.clone(), message);
|
|
||||||
self.incoming_queue.push(entry);
|
|
||||||
Ok(Step::default())
|
|
||||||
} else {
|
|
||||||
match message {
|
|
||||||
Message::HoneyBadger(_, hb_msg) => {
|
|
||||||
self.handle_honey_badger_message(sender_id, hb_msg)
|
|
||||||
}
|
|
||||||
Message::KeyGen(_, kg_msg, sig) => self
|
|
||||||
.handle_key_gen_message(sender_id, kg_msg, *sig)
|
|
||||||
.map(FaultLog::into),
|
|
||||||
Message::SignedVote(signed_vote) => self
|
|
||||||
.vote_counter
|
|
||||||
.add_pending_vote(sender_id, signed_vote)
|
|
||||||
.map(FaultLog::into),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn terminated(&self) -> bool {
|
|
||||||
false
|
|
||||||
}
|
|
||||||
|
|
||||||
fn our_id(&self) -> &NodeUid {
|
|
||||||
self.netinfo.our_uid()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<C, NodeUid> DynamicHoneyBadger<C, NodeUid>
|
|
||||||
where
|
|
||||||
C: Eq + Serialize + for<'r> Deserialize<'r> + Debug + Hash,
|
|
||||||
NodeUid: Eq + Ord + Clone + Debug + Serialize + for<'r> Deserialize<'r> + Hash + Rand,
|
|
||||||
{
|
|
||||||
/// Returns a new `DynamicHoneyBadgerBuilder`.
|
|
||||||
pub fn builder() -> DynamicHoneyBadgerBuilder<C, NodeUid> {
|
|
||||||
DynamicHoneyBadgerBuilder::new()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns `true` if input for the current epoch has already been provided.
|
|
||||||
pub fn has_input(&self) -> bool {
|
|
||||||
self.honey_badger.has_input()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Proposes a contribution in the current epoch.
|
|
||||||
pub fn propose(&mut self, contrib: C) -> Result<Step<C, NodeUid>> {
|
|
||||||
let step = self
|
|
||||||
.honey_badger
|
|
||||||
.input(InternalContrib {
|
|
||||||
contrib,
|
|
||||||
key_gen_messages: self.key_gen_msg_buffer.clone(),
|
|
||||||
votes: self.vote_counter.pending_votes().cloned().collect(),
|
|
||||||
})
|
|
||||||
.map_err(ErrorKind::ProposeHoneyBadger)?;
|
|
||||||
self.process_output(step)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Cast a vote to change the set of validators.
|
|
||||||
pub fn vote_for(&mut self, change: Change<NodeUid>) -> Result<Step<C, NodeUid>> {
|
|
||||||
if !self.netinfo.is_validator() {
|
|
||||||
return Ok(Step::default()); // TODO: Return an error?
|
|
||||||
}
|
|
||||||
let signed_vote = self.vote_counter.sign_vote_for(change)?.clone();
|
|
||||||
let msg = Message::SignedVote(signed_vote);
|
|
||||||
Ok(Target::All.message(msg).into())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns the information about the node IDs in the network, and the cryptographic keys.
|
|
||||||
pub fn netinfo(&self) -> &NetworkInfo<NodeUid> {
|
|
||||||
&self.netinfo
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns `true` if we should make our contribution for the next epoch, even if we don't have
|
|
||||||
/// content ourselves, to avoid stalling the network.
|
|
||||||
///
|
|
||||||
/// By proposing only if this returns `true`, you can prevent an adversary from making the
|
|
||||||
/// network output empty baches indefinitely, but it also means that the network won't advance
|
|
||||||
/// if fewer than _f + 1_ nodes have pending contributions.
|
|
||||||
pub fn should_propose(&self) -> bool {
|
|
||||||
if self.has_input() {
|
|
||||||
return false; // We have already proposed.
|
|
||||||
}
|
|
||||||
if self.honey_badger.received_proposals() > self.netinfo.num_faulty() {
|
|
||||||
return true; // At least one correct node wants to move on to the next epoch.
|
|
||||||
}
|
|
||||||
let is_our_vote = |signed_vote: &SignedVote<_>| signed_vote.voter() == self.our_id();
|
|
||||||
if self.vote_counter.pending_votes().any(is_our_vote) {
|
|
||||||
return true; // We have pending input to vote for a validator change.
|
|
||||||
}
|
|
||||||
let kgs = match self.key_gen_state {
|
|
||||||
None => return false, // No ongoing key generation.
|
|
||||||
Some(ref kgs) => kgs,
|
|
||||||
};
|
|
||||||
// If either we or the candidate have a pending key gen message, we should propose.
|
|
||||||
let ours_or_candidates = |msg: &SignedKeyGenMsg<_>| {
|
|
||||||
msg.1 == *self.our_id() || Some(&msg.1) == kgs.change.candidate()
|
|
||||||
};
|
|
||||||
self.key_gen_msg_buffer.iter().any(ours_or_candidates)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Handles a message for the `HoneyBadger` instance.
|
|
||||||
fn handle_honey_badger_message(
|
|
||||||
&mut self,
|
|
||||||
sender_id: &NodeUid,
|
|
||||||
message: HbMessage<NodeUid>,
|
|
||||||
) -> Result<Step<C, NodeUid>> {
|
|
||||||
if !self.netinfo.is_node_validator(sender_id) {
|
|
||||||
info!("Unknown sender {:?} of message {:?}", sender_id, message);
|
|
||||||
return Err(ErrorKind::UnknownSender.into());
|
|
||||||
}
|
|
||||||
// Handle the message.
|
|
||||||
let step = self
|
|
||||||
.honey_badger
|
|
||||||
.handle_message(sender_id, message)
|
|
||||||
.map_err(ErrorKind::HandleHoneyBadgerMessageHoneyBadger)?;
|
|
||||||
self.process_output(step)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Handles a vote or key generation message and tries to commit it as a transaction. These
|
|
||||||
/// messages are only handled once they appear in a batch output from Honey Badger.
|
|
||||||
fn handle_key_gen_message(
|
|
||||||
&mut self,
|
|
||||||
sender_id: &NodeUid,
|
|
||||||
kg_msg: KeyGenMessage,
|
|
||||||
sig: Signature,
|
|
||||||
) -> Result<FaultLog<NodeUid>> {
|
|
||||||
if !self.verify_signature(sender_id, &sig, &kg_msg)? {
|
|
||||||
info!("Invalid signature from {:?} for: {:?}.", sender_id, kg_msg);
|
|
||||||
let fault_kind = FaultKind::InvalidKeyGenMessageSignature;
|
|
||||||
return Ok(Fault::new(sender_id.clone(), fault_kind).into());
|
|
||||||
}
|
|
||||||
let kgs = match self.key_gen_state {
|
|
||||||
Some(ref mut kgs) => kgs,
|
|
||||||
None => {
|
|
||||||
info!(
|
|
||||||
"Unexpected key gen message from {:?}: {:?}.",
|
|
||||||
sender_id, kg_msg
|
|
||||||
);
|
|
||||||
return Ok(Fault::new(sender_id.clone(), FaultKind::UnexpectedKeyGenMessage).into());
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// If the joining node is correct, it will send at most (N + 1)² + 1 key generation
|
|
||||||
// messages.
|
|
||||||
if Some(sender_id) == kgs.change.candidate() {
|
|
||||||
let n = self.netinfo.num_nodes() + 1;
|
|
||||||
if kgs.candidate_msg_count > n * n {
|
|
||||||
info!(
|
|
||||||
"Too many key gen messages from candidate {:?}: {:?}.",
|
|
||||||
sender_id, kg_msg
|
|
||||||
);
|
|
||||||
let fault_kind = FaultKind::TooManyCandidateKeyGenMessages;
|
|
||||||
return Ok(Fault::new(sender_id.clone(), fault_kind).into());
|
|
||||||
}
|
|
||||||
kgs.candidate_msg_count += 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
let tx = SignedKeyGenMsg(self.start_epoch, sender_id.clone(), kg_msg, sig);
|
|
||||||
self.key_gen_msg_buffer.push(tx);
|
|
||||||
Ok(FaultLog::default())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Processes all pending batches output by Honey Badger.
|
|
||||||
fn process_output(
|
|
||||||
&mut self,
|
|
||||||
hb_step: honey_badger::Step<InternalContrib<C, NodeUid>, NodeUid>,
|
|
||||||
) -> Result<Step<C, NodeUid>> {
|
|
||||||
let mut step: Step<C, NodeUid> = Step::default();
|
|
||||||
let start_epoch = self.start_epoch;
|
|
||||||
let output = step.extend_with(hb_step, |hb_msg| Message::HoneyBadger(start_epoch, hb_msg));
|
|
||||||
for hb_batch in output {
|
|
||||||
// Create the batch we output ourselves. It will contain the _user_ transactions of
|
|
||||||
// `hb_batch`, and the current change state.
|
|
||||||
let mut batch = Batch::new(hb_batch.epoch + self.start_epoch);
|
|
||||||
|
|
||||||
// Add the user transactions to `batch` and handle votes and DKG messages.
|
|
||||||
for (id, int_contrib) in hb_batch.contributions {
|
|
||||||
let InternalContrib {
|
|
||||||
votes,
|
|
||||||
key_gen_messages,
|
|
||||||
contrib,
|
|
||||||
} = int_contrib;
|
|
||||||
step.fault_log
|
|
||||||
.extend(self.vote_counter.add_committed_votes(&id, votes)?);
|
|
||||||
batch.contributions.insert(id.clone(), contrib);
|
|
||||||
self.key_gen_msg_buffer
|
|
||||||
.retain(|skgm| !key_gen_messages.contains(skgm));
|
|
||||||
for SignedKeyGenMsg(epoch, s_id, kg_msg, sig) in key_gen_messages {
|
|
||||||
if epoch < self.start_epoch {
|
|
||||||
info!("Obsolete key generation message: {:?}.", kg_msg);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if !self.verify_signature(&s_id, &sig, &kg_msg)? {
|
|
||||||
info!(
|
|
||||||
"Invalid signature in {:?}'s batch from {:?} for: {:?}.",
|
|
||||||
id, s_id, kg_msg
|
|
||||||
);
|
|
||||||
let fault_kind = FaultKind::InvalidKeyGenMessageSignature;
|
|
||||||
step.fault_log.append(id.clone(), fault_kind);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
step.extend(match kg_msg {
|
|
||||||
KeyGenMessage::Part(part) => self.handle_part(&s_id, part)?,
|
|
||||||
KeyGenMessage::Ack(ack) => self.handle_ack(&s_id, ack)?.into(),
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(kgs) = self.take_ready_key_gen() {
|
|
||||||
// If DKG completed, apply the change, restart Honey Badger, and inform the user.
|
|
||||||
debug!("{:?} DKG for {:?} complete!", self.our_id(), kgs.change);
|
|
||||||
self.netinfo = kgs.key_gen.into_network_info();
|
|
||||||
self.restart_honey_badger(batch.epoch + 1);
|
|
||||||
batch.set_change(ChangeState::Complete(kgs.change), &self.netinfo);
|
|
||||||
} else if let Some(change) = self.vote_counter.compute_winner().cloned() {
|
|
||||||
// If there is a new change, restart DKG. Inform the user about the current change.
|
|
||||||
step.extend(self.update_key_gen(batch.epoch + 1, &change)?);
|
|
||||||
batch.set_change(ChangeState::InProgress(change), &self.netinfo);
|
|
||||||
}
|
|
||||||
step.output.push_back(batch);
|
|
||||||
}
|
|
||||||
// If `start_epoch` changed, we can now handle some queued messages.
|
|
||||||
if start_epoch < self.start_epoch {
|
|
||||||
let queue = mem::replace(&mut self.incoming_queue, Vec::new());
|
|
||||||
for (sender_id, msg) in queue {
|
|
||||||
step.extend(self.handle_message(&sender_id, msg)?);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(step)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// If the winner of the vote has changed, restarts Key Generation for the set of nodes implied
|
|
||||||
/// by the current change.
|
|
||||||
fn update_key_gen(&mut self, epoch: u64, change: &Change<NodeUid>) -> Result<Step<C, NodeUid>> {
|
|
||||||
if self.key_gen_state.as_ref().map(|kgs| &kgs.change) == Some(change) {
|
|
||||||
return Ok(Step::default()); // The change is the same as before. Continue DKG as is.
|
|
||||||
}
|
|
||||||
debug!("{:?} Restarting DKG for {:?}.", self.our_id(), change);
|
|
||||||
// Use the existing key shares - with the change applied - as keys for DKG.
|
|
||||||
let mut pub_keys = self.netinfo.public_key_map().clone();
|
|
||||||
if match *change {
|
|
||||||
Change::Remove(ref id) => pub_keys.remove(id).is_none(),
|
|
||||||
Change::Add(ref id, ref pk) => pub_keys.insert(id.clone(), pk.clone()).is_some(),
|
|
||||||
} {
|
|
||||||
info!("{:?} No-op change: {:?}", self.our_id(), change);
|
|
||||||
}
|
|
||||||
self.restart_honey_badger(epoch);
|
|
||||||
// TODO: This needs to be the same as `num_faulty` will be in the _new_
|
|
||||||
// `NetworkInfo` if the change goes through. It would be safer to deduplicate.
|
|
||||||
let threshold = (pub_keys.len() - 1) / 3;
|
|
||||||
let sk = self.netinfo.secret_key().clone();
|
|
||||||
let our_uid = self.our_id().clone();
|
|
||||||
let (key_gen, part) = SyncKeyGen::new(our_uid, sk, pub_keys, threshold);
|
|
||||||
self.key_gen_state = Some(KeyGenState::new(key_gen, change.clone()));
|
|
||||||
if let Some(part) = part {
|
|
||||||
self.send_transaction(KeyGenMessage::Part(part))
|
|
||||||
} else {
|
|
||||||
Ok(Step::default())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Starts a new `HoneyBadger` instance and resets the vote counter.
|
|
||||||
fn restart_honey_badger(&mut self, epoch: u64) {
|
|
||||||
self.start_epoch = epoch;
|
|
||||||
self.key_gen_msg_buffer.retain(|kg_msg| kg_msg.0 >= epoch);
|
|
||||||
let netinfo = Arc::new(self.netinfo.clone());
|
|
||||||
let counter = VoteCounter::new(netinfo.clone(), epoch);
|
|
||||||
mem::replace(&mut self.vote_counter, counter);
|
|
||||||
self.honey_badger = HoneyBadger::builder(netinfo)
|
|
||||||
.max_future_epochs(self.max_future_epochs)
|
|
||||||
.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Handles a `Part` message that was output by Honey Badger.
|
|
||||||
fn handle_part(&mut self, sender_id: &NodeUid, part: Part) -> Result<Step<C, NodeUid>> {
|
|
||||||
let handle = |kgs: &mut KeyGenState<NodeUid>| kgs.key_gen.handle_part(&sender_id, part);
|
|
||||||
match self.key_gen_state.as_mut().and_then(handle) {
|
|
||||||
Some(PartOutcome::Valid(ack)) => self.send_transaction(KeyGenMessage::Ack(ack)),
|
|
||||||
Some(PartOutcome::Invalid(fault_log)) => Ok(fault_log.into()),
|
|
||||||
None => Ok(Step::default()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Handles an `Ack` message that was output by Honey Badger.
|
|
||||||
fn handle_ack(&mut self, sender_id: &NodeUid, ack: Ack) -> Result<FaultLog<NodeUid>> {
|
|
||||||
if let Some(kgs) = self.key_gen_state.as_mut() {
|
|
||||||
Ok(kgs.key_gen.handle_ack(sender_id, ack))
|
|
||||||
} else {
|
|
||||||
Ok(FaultLog::new())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Signs and sends a `KeyGenMessage` and also tries to commit it.
|
|
||||||
fn send_transaction(&mut self, kg_msg: KeyGenMessage) -> Result<Step<C, NodeUid>> {
|
|
||||||
let ser =
|
|
||||||
bincode::serialize(&kg_msg).map_err(|err| ErrorKind::SendTransactionBincode(*err))?;
|
|
||||||
let sig = Box::new(self.netinfo.secret_key().sign(ser));
|
|
||||||
if self.netinfo.is_validator() {
|
|
||||||
let our_uid = self.netinfo.our_uid().clone();
|
|
||||||
let signed_msg =
|
|
||||||
SignedKeyGenMsg(self.start_epoch, our_uid, kg_msg.clone(), *sig.clone());
|
|
||||||
self.key_gen_msg_buffer.push(signed_msg);
|
|
||||||
}
|
|
||||||
let msg = Message::KeyGen(self.start_epoch, kg_msg, sig);
|
|
||||||
Ok(Target::All.message(msg).into())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// If the current Key Generation process is ready, returns the `KeyGenState`.
|
|
||||||
///
|
|
||||||
/// We require the minimum number of completed proposals (`SyncKeyGen::is_ready`) and if a new
|
|
||||||
/// node is joining, we require in addition that the new node's proposal is complete. That way
|
|
||||||
/// the new node knows that it's key is secret, without having to trust any number of nodes.
|
|
||||||
fn take_ready_key_gen(&mut self) -> Option<KeyGenState<NodeUid>> {
|
|
||||||
if self
|
|
||||||
.key_gen_state
|
|
||||||
.as_ref()
|
|
||||||
.map_or(false, KeyGenState::is_ready)
|
|
||||||
{
|
|
||||||
self.key_gen_state.take()
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns `true` if the signature of `kg_msg` by the node with the specified ID is valid.
|
|
||||||
/// Returns an error if the payload fails to serialize.
|
|
||||||
///
|
|
||||||
/// This accepts signatures from both validators and the currently joining candidate, if any.
|
|
||||||
fn verify_signature(
|
|
||||||
&self,
|
|
||||||
node_id: &NodeUid,
|
|
||||||
sig: &Signature,
|
|
||||||
kg_msg: &KeyGenMessage,
|
|
||||||
) -> Result<bool> {
|
|
||||||
let ser =
|
|
||||||
bincode::serialize(kg_msg).map_err(|err| ErrorKind::VerifySignatureBincode(*err))?;
|
|
||||||
let get_candidate_key = || {
|
|
||||||
self.key_gen_state
|
|
||||||
.as_ref()
|
|
||||||
.and_then(|kgs| kgs.candidate_key(node_id))
|
|
||||||
};
|
|
||||||
let pk_opt = self.netinfo.public_key(node_id).or_else(get_candidate_key);
|
|
||||||
Ok(pk_opt.map_or(false, |pk| pk.verify(&sig, ser)))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// The contribution for the internal `HoneyBadger` instance: this includes a user-defined
|
|
||||||
/// application-level contribution as well as internal signed messages.
|
|
||||||
#[derive(Clone, Eq, PartialEq, Debug, Serialize, Deserialize, Hash)]
|
|
||||||
struct InternalContrib<C, NodeUid> {
|
|
||||||
/// A user-defined contribution.
|
|
||||||
contrib: C,
|
|
||||||
/// Key generation messages that get committed via Honey Badger to communicate synchronously.
|
|
||||||
key_gen_messages: Vec<SignedKeyGenMsg<NodeUid>>,
|
|
||||||
/// Signed votes for validator set changes.
|
|
||||||
votes: Vec<SignedVote<NodeUid>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A signed internal message.
|
|
||||||
#[derive(Eq, PartialEq, Debug, Serialize, Deserialize, Hash, Clone)]
|
|
||||||
struct SignedKeyGenMsg<NodeUid>(u64, NodeUid, KeyGenMessage, Signature);
|
|
||||||
|
|
||||||
/// An internal message containing a vote for adding or removing a validator, or a message for key
|
/// An internal message containing a vote for adding or removing a validator, or a message for key
|
||||||
/// generation. It gets committed via Honey Badger and is only handled after it has been output in
|
/// generation. It gets committed via Honey Badger and is only handled after it has been output in
|
||||||
/// a batch, so that all nodes see these messages in the same order.
|
/// a batch, so that all nodes see these messages in the same order.
|
||||||
|
@ -598,3 +180,19 @@ impl<NodeUid: Ord + Clone + Debug> KeyGenState<NodeUid> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// The contribution for the internal `HoneyBadger` instance: this includes a user-defined
|
||||||
|
/// application-level contribution as well as internal signed messages.
|
||||||
|
#[derive(Clone, Eq, PartialEq, Debug, Serialize, Deserialize, Hash)]
|
||||||
|
struct InternalContrib<C, NodeUid> {
|
||||||
|
/// A user-defined contribution.
|
||||||
|
contrib: C,
|
||||||
|
/// Key generation messages that get committed via Honey Badger to communicate synchronously.
|
||||||
|
key_gen_messages: Vec<SignedKeyGenMsg<NodeUid>>,
|
||||||
|
/// Signed votes for validator set changes.
|
||||||
|
votes: Vec<SignedVote<NodeUid>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A signed internal message.
|
||||||
|
#[derive(Eq, PartialEq, Debug, Serialize, Deserialize, Hash, Clone)]
|
||||||
|
struct SignedKeyGenMsg<NodeUid>(u64, NodeUid, KeyGenMessage, Signature);
|
||||||
|
|
|
@ -67,11 +67,10 @@ where
|
||||||
sender_id: &NodeUid,
|
sender_id: &NodeUid,
|
||||||
signed_vote: SignedVote<NodeUid>,
|
signed_vote: SignedVote<NodeUid>,
|
||||||
) -> Result<FaultLog<NodeUid>> {
|
) -> Result<FaultLog<NodeUid>> {
|
||||||
if signed_vote.vote.era != self.era
|
if signed_vote.vote.era != self.era || self
|
||||||
|| self
|
.pending
|
||||||
.pending
|
.get(&signed_vote.voter)
|
||||||
.get(&signed_vote.voter)
|
.map_or(false, |sv| sv.vote.num >= signed_vote.vote.num)
|
||||||
.map_or(false, |sv| sv.vote.num >= signed_vote.vote.num)
|
|
||||||
{
|
{
|
||||||
return Ok(FaultLog::new()); // The vote is obsolete or already exists.
|
return Ok(FaultLog::new()); // The vote is obsolete or already exists.
|
||||||
}
|
}
|
||||||
|
@ -150,8 +149,8 @@ where
|
||||||
|
|
||||||
/// Returns `true` if the signature is valid.
|
/// Returns `true` if the signature is valid.
|
||||||
fn validate(&self, signed_vote: &SignedVote<NodeUid>) -> Result<bool> {
|
fn validate(&self, signed_vote: &SignedVote<NodeUid>) -> Result<bool> {
|
||||||
let ser_vote =
|
let ser_vote = bincode::serialize(&signed_vote.vote)
|
||||||
bincode::serialize(&signed_vote.vote).map_err(|err| ErrorKind::ValidateBincode(*err))?;
|
.map_err(|err| ErrorKind::ValidateBincode(*err))?;
|
||||||
let pk_opt = self.netinfo.public_key(&signed_vote.voter);
|
let pk_opt = self.netinfo.public_key(&signed_vote.voter);
|
||||||
Ok(pk_opt.map_or(false, |pk| pk.verify(&signed_vote.sig, ser_vote)))
|
Ok(pk_opt.map_or(false, |pk| pk.verify(&signed_vote.sig, ser_vote)))
|
||||||
}
|
}
|
||||||
|
|
|
@ -270,8 +270,7 @@ where
|
||||||
step.fault_log.append(proposer_id, fault_kind);
|
step.fault_log.append(proposer_id, fault_kind);
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
})
|
}).collect();
|
||||||
.collect();
|
|
||||||
let batch = Batch {
|
let batch = Batch {
|
||||||
epoch: self.epoch,
|
epoch: self.epoch,
|
||||||
contributions,
|
contributions,
|
||||||
|
|
|
@ -81,8 +81,7 @@ where
|
||||||
// If there's only one node, it will immediately output on input. Make sure we
|
// If there's only one node, it will immediately output on input. Make sure we
|
||||||
// first process all incoming messages before providing input again.
|
// first process all incoming messages before providing input again.
|
||||||
&& (network.nodes.len() > 2 || node.queue.is_empty())
|
&& (network.nodes.len() > 2 || node.queue.is_empty())
|
||||||
})
|
}).map(|(id, _)| *id)
|
||||||
.map(|(id, _)| *id)
|
|
||||||
.collect();
|
.collect();
|
||||||
if let Some(id) = rng.choose(&input_ids) {
|
if let Some(id) = rng.choose(&input_ids) {
|
||||||
let queue = queues.get_mut(id).unwrap();
|
let queue = queues.get_mut(id).unwrap();
|
||||||
|
|
|
@ -175,8 +175,7 @@ where
|
||||||
epoch,
|
epoch,
|
||||||
contributions,
|
contributions,
|
||||||
}| (epoch, contributions),
|
}| (epoch, contributions),
|
||||||
)
|
).collect();
|
||||||
.collect();
|
|
||||||
if expected.is_none() {
|
if expected.is_none() {
|
||||||
expected = Some(outputs);
|
expected = Some(outputs);
|
||||||
} else if let Some(expected) = &expected {
|
} else if let Some(expected) = &expected {
|
||||||
|
|
|
@ -31,8 +31,7 @@ fn test_sync_key_gen_with(threshold: usize, node_num: usize) {
|
||||||
let (sync_key_gen, proposal) = SyncKeyGen::new(id, sk, pub_keys.clone(), threshold);
|
let (sync_key_gen, proposal) = SyncKeyGen::new(id, sk, pub_keys.clone(), threshold);
|
||||||
nodes.push(sync_key_gen);
|
nodes.push(sync_key_gen);
|
||||||
proposal
|
proposal
|
||||||
})
|
}).collect();
|
||||||
.collect();
|
|
||||||
|
|
||||||
// Handle the first `threshold + 1` proposals. Those should suffice for key generation.
|
// Handle the first `threshold + 1` proposals. Those should suffice for key generation.
|
||||||
let mut acks = Vec::new();
|
let mut acks = Vec::new();
|
||||||
|
@ -72,8 +71,7 @@ fn test_sync_key_gen_with(threshold: usize, node_num: usize) {
|
||||||
let sig = sk.sign(msg);
|
let sig = sk.sign(msg);
|
||||||
assert!(pks.public_key_share(idx).verify(&sig, msg));
|
assert!(pks.public_key_share(idx).verify(&sig, msg));
|
||||||
(idx, sig)
|
(idx, sig)
|
||||||
})
|
}).collect();
|
||||||
.collect();
|
|
||||||
let sig = pub_key_set
|
let sig = pub_key_set
|
||||||
.combine_signatures(sig_shares.iter().take(threshold + 1))
|
.combine_signatures(sig_shares.iter().take(threshold + 1))
|
||||||
.expect("signature shares match");
|
.expect("signature shares match");
|
||||||
|
|
Loading…
Reference in New Issue