mirror of https://github.com/poanetwork/hbbft.git
Merge pull request #99 from poanetwork/report-faulty-nodes
Added logging for faulty nodes
This commit is contained in:
commit
eb22d84db9
|
@ -440,6 +440,7 @@ fn main() {
|
|||
.batch_size(args.flag_b)
|
||||
.build_with_transactions(txs.clone())
|
||||
.expect("Instantiate honey_badger")
|
||||
.0
|
||||
};
|
||||
let hw_quality = HwQuality {
|
||||
latency: Duration::from_millis(args.flag_lag),
|
||||
|
|
|
@ -75,6 +75,7 @@ use itertools::Itertools;
|
|||
use agreement::bin_values::BinValues;
|
||||
use common_coin;
|
||||
use common_coin::{CommonCoin, CommonCoinMessage};
|
||||
use fault_log::FaultLog;
|
||||
use messaging::{DistAlgorithm, NetworkInfo, Target, TargetedMessage};
|
||||
|
||||
error_chain!{
|
||||
|
@ -195,7 +196,7 @@ impl<NodeUid: Clone + Debug + Ord> DistAlgorithm for Agreement<NodeUid> {
|
|||
type Message = AgreementMessage;
|
||||
type Error = Error;
|
||||
|
||||
fn input(&mut self, input: Self::Input) -> AgreementResult<()> {
|
||||
fn input(&mut self, input: Self::Input) -> AgreementResult<FaultLog<NodeUid>> {
|
||||
self.set_input(input)
|
||||
}
|
||||
|
||||
|
@ -204,20 +205,21 @@ impl<NodeUid: Clone + Debug + Ord> DistAlgorithm for Agreement<NodeUid> {
|
|||
&mut self,
|
||||
sender_id: &Self::NodeUid,
|
||||
message: Self::Message,
|
||||
) -> AgreementResult<()> {
|
||||
) -> AgreementResult<FaultLog<NodeUid>> {
|
||||
if self.terminated || message.epoch < self.epoch {
|
||||
return Ok(()); // Message is obsolete: We are already in a later epoch or terminated.
|
||||
// Message is obsolete; we're already in a later epoch or terminated.
|
||||
return Ok(FaultLog::new());
|
||||
}
|
||||
if message.epoch > self.epoch {
|
||||
// Message is for a later epoch. We can't handle that yet.
|
||||
self.incoming_queue.push((sender_id.clone(), message));
|
||||
return Ok(());
|
||||
return Ok(FaultLog::new());
|
||||
}
|
||||
match message.content {
|
||||
AgreementContent::BVal(b) => self.handle_bval(sender_id, b),
|
||||
AgreementContent::Aux(b) => self.handle_aux(sender_id, b),
|
||||
AgreementContent::Conf(v) => self.handle_conf(sender_id, v),
|
||||
AgreementContent::Term(v) => self.handle_term(sender_id, v),
|
||||
AgreementContent::Term(v) => self.handle_term(sender_id, v).map(|()| FaultLog::new()),
|
||||
AgreementContent::Coin(msg) => self.handle_coin(sender_id, *msg),
|
||||
}
|
||||
}
|
||||
|
@ -282,15 +284,15 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
|
|||
}
|
||||
|
||||
/// Sets the input value for agreement.
|
||||
pub fn set_input(&mut self, input: bool) -> AgreementResult<()> {
|
||||
pub fn set_input(&mut self, input: bool) -> AgreementResult<FaultLog<NodeUid>> {
|
||||
if self.epoch != 0 || self.estimated.is_some() {
|
||||
return Err(ErrorKind::InputNotAccepted.into());
|
||||
}
|
||||
if self.netinfo.num_nodes() == 1 {
|
||||
self.send_bval(input)?;
|
||||
self.send_aux(input)?;
|
||||
let mut fault_log = self.send_bval(input)?;
|
||||
self.send_aux(input)?.merge_into(&mut fault_log);
|
||||
self.decide(input);
|
||||
Ok(())
|
||||
Ok(fault_log)
|
||||
} else {
|
||||
// Set the initial estimated value to the input value.
|
||||
self.estimated = Some(input);
|
||||
|
@ -304,7 +306,7 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
|
|||
self.epoch == 0 && self.estimated.is_none()
|
||||
}
|
||||
|
||||
fn handle_bval(&mut self, sender_id: &NodeUid, b: bool) -> AgreementResult<()> {
|
||||
fn handle_bval(&mut self, sender_id: &NodeUid, b: bool) -> AgreementResult<FaultLog<NodeUid>> {
|
||||
self.received_bval
|
||||
.entry(sender_id.clone())
|
||||
.or_insert_with(BTreeSet::new)
|
||||
|
@ -329,27 +331,27 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
|
|||
} else if bin_values_changed {
|
||||
self.on_bin_values_changed()
|
||||
} else {
|
||||
Ok(())
|
||||
Ok(FaultLog::new())
|
||||
}
|
||||
} else if count_bval == self.netinfo.num_faulty() + 1 && !self.sent_bval.contains(&b) {
|
||||
// upon receiving `BVal(b)` messages from f + 1 nodes, if
|
||||
// `BVal(b)` has not been sent, multicast `BVal(b)`
|
||||
self.send_bval(b)
|
||||
} else {
|
||||
Ok(())
|
||||
Ok(FaultLog::new())
|
||||
}
|
||||
}
|
||||
|
||||
/// Called when `bin_values` changes as a result of receiving a `BVal` message. Tries to update
|
||||
/// the epoch.
|
||||
fn on_bin_values_changed(&mut self) -> AgreementResult<()> {
|
||||
fn on_bin_values_changed(&mut self) -> AgreementResult<FaultLog<NodeUid>> {
|
||||
match self.coin_schedule {
|
||||
CoinSchedule::False => {
|
||||
let (aux_count, aux_vals) = self.count_aux();
|
||||
if aux_count >= self.netinfo.num_nodes() - self.netinfo.num_faulty() {
|
||||
self.on_coin(false, aux_vals.definite())
|
||||
} else {
|
||||
Ok(())
|
||||
Ok(FaultLog::new())
|
||||
}
|
||||
}
|
||||
CoinSchedule::True => {
|
||||
|
@ -357,7 +359,7 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
|
|||
if aux_count >= self.netinfo.num_nodes() - self.netinfo.num_faulty() {
|
||||
self.on_coin(true, aux_vals.definite())
|
||||
} else {
|
||||
Ok(())
|
||||
Ok(FaultLog::new())
|
||||
}
|
||||
}
|
||||
CoinSchedule::Random => {
|
||||
|
@ -368,9 +370,9 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
|
|||
}
|
||||
}
|
||||
|
||||
fn send_bval(&mut self, b: bool) -> AgreementResult<()> {
|
||||
fn send_bval(&mut self, b: bool) -> AgreementResult<FaultLog<NodeUid>> {
|
||||
if !self.netinfo.is_validator() {
|
||||
return Ok(());
|
||||
return Ok(FaultLog::new());
|
||||
}
|
||||
// Record the value `b` as sent.
|
||||
self.sent_bval.insert(b);
|
||||
|
@ -382,17 +384,17 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
|
|||
self.handle_bval(our_uid, b)
|
||||
}
|
||||
|
||||
fn send_conf(&mut self) -> AgreementResult<()> {
|
||||
fn send_conf(&mut self) -> AgreementResult<FaultLog<NodeUid>> {
|
||||
if self.conf_round {
|
||||
// Only one `Conf` message is allowed in an epoch.
|
||||
return Ok(());
|
||||
return Ok(FaultLog::new());
|
||||
}
|
||||
|
||||
// Trigger the start of the `Conf` round.
|
||||
self.conf_round = true;
|
||||
|
||||
if !self.netinfo.is_validator() {
|
||||
return Ok(());
|
||||
return Ok(FaultLog::new());
|
||||
}
|
||||
|
||||
let v = self.bin_values;
|
||||
|
@ -409,19 +411,19 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
|
|||
/// bin_values (note that bin_values_r may continue to change as `BVal`
|
||||
/// messages are received, thus this condition may be triggered upon arrival
|
||||
/// of either an `Aux` or a `BVal` message).
|
||||
fn handle_aux(&mut self, sender_id: &NodeUid, b: bool) -> AgreementResult<()> {
|
||||
fn handle_aux(&mut self, sender_id: &NodeUid, b: bool) -> AgreementResult<FaultLog<NodeUid>> {
|
||||
// Perform the `Aux` message round only if a `Conf` round hasn't started yet.
|
||||
if self.conf_round {
|
||||
return Ok(());
|
||||
return Ok(FaultLog::new());
|
||||
}
|
||||
self.received_aux.insert(sender_id.clone(), b);
|
||||
if self.bin_values == BinValues::None {
|
||||
return Ok(());
|
||||
return Ok(FaultLog::new());
|
||||
}
|
||||
let (aux_count, aux_vals) = self.count_aux();
|
||||
if aux_count < self.netinfo.num_nodes() - self.netinfo.num_faulty() {
|
||||
// Continue waiting for the (N - f) `Aux` messages.
|
||||
return Ok(());
|
||||
return Ok(FaultLog::new());
|
||||
}
|
||||
|
||||
// Execute the Common Coin schedule `false, true, get_coin(), false, true, get_coin(), ...`
|
||||
|
@ -435,7 +437,11 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
|
|||
}
|
||||
}
|
||||
|
||||
fn handle_conf(&mut self, sender_id: &NodeUid, v: BinValues) -> AgreementResult<()> {
|
||||
fn handle_conf(
|
||||
&mut self,
|
||||
sender_id: &NodeUid,
|
||||
v: BinValues,
|
||||
) -> AgreementResult<FaultLog<NodeUid>> {
|
||||
self.received_conf.insert(sender_id.clone(), v);
|
||||
self.try_finish_conf_round()
|
||||
}
|
||||
|
@ -457,21 +463,30 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
|
|||
|
||||
/// Handles a Common Coin message. If there is output from Common Coin, starts the next
|
||||
/// epoch. The function may output a decision value.
|
||||
fn handle_coin(&mut self, sender_id: &NodeUid, msg: CommonCoinMessage) -> AgreementResult<()> {
|
||||
self.common_coin.handle_message(sender_id, msg)?;
|
||||
fn handle_coin(
|
||||
&mut self,
|
||||
sender_id: &NodeUid,
|
||||
msg: CommonCoinMessage,
|
||||
) -> AgreementResult<FaultLog<NodeUid>> {
|
||||
let mut fault_log = self.common_coin.handle_message(sender_id, msg)?;
|
||||
self.extend_common_coin();
|
||||
|
||||
if let Some(coin) = self.common_coin.next_output() {
|
||||
let def_bin_value = self.count_conf().1.definite();
|
||||
self.on_coin(coin, def_bin_value)?;
|
||||
self.on_coin(coin, def_bin_value)?
|
||||
.merge_into(&mut fault_log);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
Ok(fault_log)
|
||||
}
|
||||
|
||||
/// When the common coin has been computed, tries to decide on an output value, updates the
|
||||
/// `Agreement` epoch and handles queued messages for the new epoch.
|
||||
fn on_coin(&mut self, coin: bool, def_bin_value: Option<bool>) -> AgreementResult<()> {
|
||||
fn on_coin(
|
||||
&mut self,
|
||||
coin: bool,
|
||||
def_bin_value: Option<bool>,
|
||||
) -> AgreementResult<FaultLog<NodeUid>> {
|
||||
let b = if let Some(b) = def_bin_value {
|
||||
// Outputting a value is allowed only once.
|
||||
if self.decision.is_none() && b == coin {
|
||||
|
@ -485,12 +500,13 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
|
|||
self.update_epoch();
|
||||
|
||||
self.estimated = Some(b);
|
||||
self.send_bval(b)?;
|
||||
let mut fault_log = self.send_bval(b)?;
|
||||
let queued_msgs = replace(&mut self.incoming_queue, Vec::new());
|
||||
for (sender_id, msg) in queued_msgs {
|
||||
self.handle_message(&sender_id, msg)?;
|
||||
self.handle_message(&sender_id, msg)?
|
||||
.merge_into(&mut fault_log);
|
||||
}
|
||||
Ok(())
|
||||
Ok(fault_log)
|
||||
}
|
||||
|
||||
/// Computes the coin schedule for the current `Agreement` epoch.
|
||||
|
@ -534,23 +550,23 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
|
|||
);
|
||||
}
|
||||
|
||||
fn try_finish_conf_round(&mut self) -> AgreementResult<()> {
|
||||
fn try_finish_conf_round(&mut self) -> AgreementResult<FaultLog<NodeUid>> {
|
||||
let mut fault_log = FaultLog::new();
|
||||
if self.conf_round {
|
||||
let (count_vals, _) = self.count_conf();
|
||||
if count_vals < self.netinfo.num_nodes() - self.netinfo.num_faulty() {
|
||||
// Continue waiting for (N - f) `Conf` messages
|
||||
return Ok(());
|
||||
return Ok(fault_log);
|
||||
}
|
||||
// Invoke the comon coin.
|
||||
self.common_coin.input(())?;
|
||||
self.common_coin.input(())?.merge_into(&mut fault_log);
|
||||
self.extend_common_coin();
|
||||
}
|
||||
Ok(())
|
||||
Ok(fault_log)
|
||||
}
|
||||
|
||||
fn send_aux(&mut self, b: bool) -> AgreementResult<()> {
|
||||
fn send_aux(&mut self, b: bool) -> AgreementResult<FaultLog<NodeUid>> {
|
||||
if !self.netinfo.is_validator() {
|
||||
return Ok(());
|
||||
return Ok(FaultLog::new());
|
||||
}
|
||||
// Multicast `Aux`.
|
||||
self.messages
|
||||
|
|
|
@ -52,6 +52,7 @@ use reed_solomon_erasure as rse;
|
|||
use reed_solomon_erasure::ReedSolomon;
|
||||
use ring::digest;
|
||||
|
||||
use fault_log::{FaultKind, FaultLog};
|
||||
use fmt::{HexBytes, HexList, HexProof};
|
||||
use messaging::{DistAlgorithm, NetworkInfo, Target, TargetedMessage};
|
||||
|
||||
|
@ -126,7 +127,7 @@ impl<NodeUid: Debug + Clone + Ord> DistAlgorithm for Broadcast<NodeUid> {
|
|||
type Message = BroadcastMessage;
|
||||
type Error = Error;
|
||||
|
||||
fn input(&mut self, input: Self::Input) -> BroadcastResult<()> {
|
||||
fn input(&mut self, input: Self::Input) -> BroadcastResult<FaultLog<NodeUid>> {
|
||||
if *self.netinfo.our_uid() != self.proposer_id {
|
||||
return Err(ErrorKind::InstanceCannotPropose.into());
|
||||
}
|
||||
|
@ -142,14 +143,16 @@ impl<NodeUid: Debug + Clone + Ord> DistAlgorithm for Broadcast<NodeUid> {
|
|||
&mut self,
|
||||
sender_id: &NodeUid,
|
||||
message: Self::Message,
|
||||
) -> BroadcastResult<()> {
|
||||
) -> BroadcastResult<FaultLog<NodeUid>> {
|
||||
if !self.netinfo.all_uids().contains(sender_id) {
|
||||
return Err(ErrorKind::UnknownSender.into());
|
||||
}
|
||||
match message {
|
||||
BroadcastMessage::Value(p) => self.handle_value(sender_id, p),
|
||||
BroadcastMessage::Echo(p) => self.handle_echo(sender_id, p),
|
||||
BroadcastMessage::Ready(ref hash) => self.handle_ready(sender_id, hash),
|
||||
BroadcastMessage::Ready(ref hash) => {
|
||||
self.handle_ready(sender_id, hash).map(|()| FaultLog::new())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -272,9 +275,12 @@ impl<NodeUid: Debug + Clone + Ord> Broadcast<NodeUid> {
|
|||
}
|
||||
|
||||
/// Handles a received echo and verifies the proof it contains.
|
||||
fn handle_value(&mut self, sender_id: &NodeUid, p: Proof<Vec<u8>>) -> BroadcastResult<()> {
|
||||
// If the sender is not the proposer, this is not the first `Value` or the proof is invalid,
|
||||
// ignore.
|
||||
fn handle_value(
|
||||
&mut self,
|
||||
sender_id: &NodeUid,
|
||||
p: Proof<Vec<u8>>,
|
||||
) -> BroadcastResult<FaultLog<NodeUid>> {
|
||||
// If the sender is not the proposer or if this is not the first `Value`, ignore.
|
||||
if *sender_id != self.proposer_id {
|
||||
info!(
|
||||
"Node {:?} received Value from {:?} instead of {:?}.",
|
||||
|
@ -282,17 +288,22 @@ impl<NodeUid: Debug + Clone + Ord> Broadcast<NodeUid> {
|
|||
sender_id,
|
||||
self.proposer_id
|
||||
);
|
||||
return Ok(());
|
||||
let fault_kind = FaultKind::ReceivedValueFromNonProposer;
|
||||
return Ok(FaultLog::init(sender_id.clone(), fault_kind));
|
||||
}
|
||||
if self.echo_sent {
|
||||
info!(
|
||||
"Node {:?} received multiple Values.",
|
||||
self.netinfo.our_uid()
|
||||
);
|
||||
return Ok(());
|
||||
// TODO: should receiving two Values from a node be considered
|
||||
// a fault? If so, return a `Fault` here. For now, ignore.
|
||||
return Ok(FaultLog::new());
|
||||
}
|
||||
|
||||
// If the proof is invalid, log the faulty node behavior and ignore.
|
||||
if !self.validate_proof(&p, &self.netinfo.our_uid()) {
|
||||
return Ok(());
|
||||
return Ok(FaultLog::init(sender_id.clone(), FaultKind::InvalidProof));
|
||||
}
|
||||
|
||||
// Otherwise multicast the proof in an `Echo` message, and handle it ourselves.
|
||||
|
@ -300,18 +311,26 @@ impl<NodeUid: Debug + Clone + Ord> Broadcast<NodeUid> {
|
|||
}
|
||||
|
||||
/// Handles a received `Echo` message.
|
||||
fn handle_echo(&mut self, sender_id: &NodeUid, p: Proof<Vec<u8>>) -> BroadcastResult<()> {
|
||||
// If the proof is invalid or the sender has already sent `Echo`, ignore.
|
||||
fn handle_echo(
|
||||
&mut self,
|
||||
sender_id: &NodeUid,
|
||||
p: Proof<Vec<u8>>,
|
||||
) -> BroadcastResult<FaultLog<NodeUid>> {
|
||||
let mut fault_log = FaultLog::new();
|
||||
// If the sender has already sent `Echo`, ignore.
|
||||
if self.echos.contains_key(sender_id) {
|
||||
info!(
|
||||
"Node {:?} received multiple Echos from {:?}.",
|
||||
self.netinfo.our_uid(),
|
||||
sender_id,
|
||||
);
|
||||
return Ok(());
|
||||
return Ok(fault_log);
|
||||
}
|
||||
|
||||
// If the proof is invalid, log the faulty-node behavior, and ignore.
|
||||
if !self.validate_proof(&p, sender_id) {
|
||||
return Ok(());
|
||||
fault_log.append(sender_id.clone(), FaultKind::InvalidProof);
|
||||
return Ok(fault_log);
|
||||
}
|
||||
|
||||
let hash = p.root_hash.clone();
|
||||
|
@ -322,11 +341,13 @@ impl<NodeUid: Debug + Clone + Ord> Broadcast<NodeUid> {
|
|||
if self.ready_sent
|
||||
|| self.count_echos(&hash) < self.netinfo.num_nodes() - self.netinfo.num_faulty()
|
||||
{
|
||||
return self.compute_output(&hash);
|
||||
self.compute_output(&hash)?;
|
||||
return Ok(fault_log);
|
||||
}
|
||||
|
||||
// Upon receiving _N - f_ `Echo`s with this root hash, multicast `Ready`.
|
||||
self.send_ready(&hash)
|
||||
// Upon receiving `N - f` `Echo`s with this root hash, multicast `Ready`.
|
||||
self.send_ready(&hash)?;
|
||||
Ok(fault_log)
|
||||
}
|
||||
|
||||
/// Handles a received `Ready` message.
|
||||
|
@ -353,10 +374,10 @@ impl<NodeUid: Debug + Clone + Ord> Broadcast<NodeUid> {
|
|||
}
|
||||
|
||||
/// Sends an `Echo` message and handles it. Does nothing if we are only an observer.
|
||||
fn send_echo(&mut self, p: Proof<Vec<u8>>) -> BroadcastResult<()> {
|
||||
fn send_echo(&mut self, p: Proof<Vec<u8>>) -> BroadcastResult<FaultLog<NodeUid>> {
|
||||
self.echo_sent = true;
|
||||
if !self.netinfo.is_validator() {
|
||||
return Ok(());
|
||||
return Ok(FaultLog::new());
|
||||
}
|
||||
let echo_msg = Target::All.message(BroadcastMessage::Echo(p.clone()));
|
||||
self.messages.push_back(echo_msg);
|
||||
|
|
|
@ -27,6 +27,7 @@ use std::rc::Rc;
|
|||
|
||||
use crypto::error as cerror;
|
||||
use crypto::Signature;
|
||||
use fault_log::{FaultKind, FaultLog};
|
||||
use messaging::{DistAlgorithm, NetworkInfo, Target, TargetedMessage};
|
||||
|
||||
error_chain! {
|
||||
|
@ -89,19 +90,23 @@ where
|
|||
type Error = Error;
|
||||
|
||||
/// Sends our threshold signature share if not yet sent.
|
||||
fn input(&mut self, _input: Self::Input) -> Result<()> {
|
||||
fn input(&mut self, _input: Self::Input) -> Result<FaultLog<NodeUid>> {
|
||||
if !self.had_input {
|
||||
self.had_input = true;
|
||||
self.get_coin()
|
||||
} else {
|
||||
Ok(())
|
||||
Ok(FaultLog::new())
|
||||
}
|
||||
}
|
||||
|
||||
/// Receives input from a remote node.
|
||||
fn handle_message(&mut self, sender_id: &Self::NodeUid, message: Self::Message) -> Result<()> {
|
||||
fn handle_message(
|
||||
&mut self,
|
||||
sender_id: &Self::NodeUid,
|
||||
message: Self::Message,
|
||||
) -> Result<FaultLog<NodeUid>> {
|
||||
if self.terminated {
|
||||
return Ok(());
|
||||
return Ok(FaultLog::new());
|
||||
}
|
||||
let CommonCoinMessage(share) = message;
|
||||
self.handle_share(sender_id, share)
|
||||
|
@ -146,9 +151,10 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
fn get_coin(&mut self) -> Result<()> {
|
||||
fn get_coin(&mut self) -> Result<FaultLog<NodeUid>> {
|
||||
if !self.netinfo.is_validator() {
|
||||
return self.try_output();
|
||||
self.try_output()?;
|
||||
return Ok(FaultLog::new());
|
||||
}
|
||||
let share = self.netinfo.secret_key().sign(&self.nonce);
|
||||
self.messages.push_back(CommonCoinMessage(share.clone()));
|
||||
|
@ -156,17 +162,20 @@ where
|
|||
self.handle_share(&id, share)
|
||||
}
|
||||
|
||||
fn handle_share(&mut self, sender_id: &NodeUid, share: Signature) -> Result<()> {
|
||||
fn handle_share(&mut self, sender_id: &NodeUid, share: Signature) -> Result<FaultLog<NodeUid>> {
|
||||
if let Some(pk_i) = self.netinfo.public_key_share(sender_id) {
|
||||
if !pk_i.verify(&share, &self.nonce) {
|
||||
// Silently ignore the invalid share.
|
||||
return Ok(());
|
||||
// Log the faulty node and ignore the invalid share.
|
||||
let fault_kind = FaultKind::UnverifiedSignatureShareSender;
|
||||
let fault_log = FaultLog::init(sender_id.clone(), fault_kind);
|
||||
return Ok(fault_log);
|
||||
}
|
||||
self.received_shares.insert(sender_id.clone(), share);
|
||||
} else {
|
||||
return Err(ErrorKind::UnknownSender.into());
|
||||
}
|
||||
self.try_output()
|
||||
self.try_output()?;
|
||||
Ok(FaultLog::new())
|
||||
}
|
||||
|
||||
fn try_output(&mut self) -> Result<()> {
|
||||
|
|
|
@ -27,10 +27,9 @@ use std::collections::{BTreeMap, BTreeSet, VecDeque};
|
|||
use std::fmt::Debug;
|
||||
use std::rc::Rc;
|
||||
|
||||
use agreement;
|
||||
use agreement::{Agreement, AgreementMessage};
|
||||
use broadcast;
|
||||
use broadcast::{Broadcast, BroadcastMessage};
|
||||
use agreement::{self, Agreement, AgreementMessage, AgreementResult};
|
||||
use broadcast::{self, Broadcast, BroadcastMessage, BroadcastResult};
|
||||
use fault_log::FaultLog;
|
||||
use fmt::HexBytes;
|
||||
use messaging::{DistAlgorithm, NetworkInfo, TargetedMessage};
|
||||
|
||||
|
@ -109,7 +108,7 @@ impl<NodeUid: Clone + Debug + Ord> DistAlgorithm for CommonSubset<NodeUid> {
|
|||
type Message = Message<NodeUid>;
|
||||
type Error = Error;
|
||||
|
||||
fn input(&mut self, input: Self::Input) -> CommonSubsetResult<()> {
|
||||
fn input(&mut self, input: Self::Input) -> CommonSubsetResult<FaultLog<NodeUid>> {
|
||||
debug!(
|
||||
"{:?} Proposing {:?}",
|
||||
self.netinfo.our_uid(),
|
||||
|
@ -122,7 +121,7 @@ impl<NodeUid: Clone + Debug + Ord> DistAlgorithm for CommonSubset<NodeUid> {
|
|||
&mut self,
|
||||
sender_id: &Self::NodeUid,
|
||||
message: Self::Message,
|
||||
) -> CommonSubsetResult<()> {
|
||||
) -> CommonSubsetResult<FaultLog<NodeUid>> {
|
||||
match message {
|
||||
Message::Broadcast(p_id, b_msg) => self.handle_broadcast(sender_id, &p_id, b_msg),
|
||||
Message::Agreement(p_id, a_msg) => self.handle_agreement(sender_id, &p_id, a_msg),
|
||||
|
@ -180,9 +179,12 @@ impl<NodeUid: Clone + Debug + Ord> CommonSubset<NodeUid> {
|
|||
|
||||
/// Common Subset input message handler. It receives a value for broadcast
|
||||
/// and redirects it to the corresponding broadcast instance.
|
||||
pub fn send_proposed_value(&mut self, value: ProposedValue) -> CommonSubsetResult<()> {
|
||||
pub fn send_proposed_value(
|
||||
&mut self,
|
||||
value: ProposedValue,
|
||||
) -> CommonSubsetResult<FaultLog<NodeUid>> {
|
||||
if !self.netinfo.is_validator() {
|
||||
return Ok(());
|
||||
return Ok(FaultLog::new());
|
||||
}
|
||||
let uid = self.netinfo.our_uid().clone();
|
||||
// Upon receiving input v_i , input v_i to RBC_i. See Figure 2.
|
||||
|
@ -196,7 +198,7 @@ impl<NodeUid: Clone + Debug + Ord> CommonSubset<NodeUid> {
|
|||
sender_id: &NodeUid,
|
||||
proposer_id: &NodeUid,
|
||||
bmessage: BroadcastMessage,
|
||||
) -> CommonSubsetResult<()> {
|
||||
) -> CommonSubsetResult<FaultLog<NodeUid>> {
|
||||
self.process_broadcast(proposer_id, |bc| bc.handle_message(sender_id, bmessage))
|
||||
}
|
||||
|
||||
|
@ -207,7 +209,7 @@ impl<NodeUid: Clone + Debug + Ord> CommonSubset<NodeUid> {
|
|||
sender_id: &NodeUid,
|
||||
proposer_id: &NodeUid,
|
||||
amessage: AgreementMessage,
|
||||
) -> CommonSubsetResult<()> {
|
||||
) -> CommonSubsetResult<FaultLog<NodeUid>> {
|
||||
// Send the message to the local instance of Agreement
|
||||
self.process_agreement(proposer_id, |agreement| {
|
||||
agreement.handle_message(sender_id, amessage)
|
||||
|
@ -216,53 +218,66 @@ impl<NodeUid: Clone + Debug + Ord> CommonSubset<NodeUid> {
|
|||
|
||||
/// Upon delivery of v_j from RBC_j, if input has not yet been provided to
|
||||
/// BA_j, then provide input 1 to BA_j. See Figure 11.
|
||||
fn process_broadcast<F>(&mut self, proposer_id: &NodeUid, f: F) -> CommonSubsetResult<()>
|
||||
fn process_broadcast<F>(
|
||||
&mut self,
|
||||
proposer_id: &NodeUid,
|
||||
f: F,
|
||||
) -> CommonSubsetResult<FaultLog<NodeUid>>
|
||||
where
|
||||
F: FnOnce(&mut Broadcast<NodeUid>) -> Result<(), broadcast::Error>,
|
||||
F: FnOnce(&mut Broadcast<NodeUid>) -> BroadcastResult<FaultLog<NodeUid>>,
|
||||
{
|
||||
let mut fault_log = FaultLog::new();
|
||||
let value = {
|
||||
let broadcast = self
|
||||
.broadcast_instances
|
||||
.get_mut(proposer_id)
|
||||
.ok_or(ErrorKind::NoSuchBroadcastInstance)?;
|
||||
f(broadcast)?;
|
||||
f(broadcast)?.merge_into(&mut fault_log);
|
||||
self.messages.extend_broadcast(&proposer_id, broadcast);
|
||||
if let Some(output) = broadcast.next_output() {
|
||||
output
|
||||
} else {
|
||||
return Ok(());
|
||||
return Ok(fault_log);
|
||||
}
|
||||
};
|
||||
self.broadcast_results.insert(proposer_id.clone(), value);
|
||||
self.process_agreement(proposer_id, |agreement| {
|
||||
let set_agreement_input = |agreement: &mut Agreement<NodeUid>| {
|
||||
if agreement.accepts_input() {
|
||||
agreement.set_input(true)
|
||||
} else {
|
||||
Ok(())
|
||||
Ok(FaultLog::new())
|
||||
}
|
||||
})
|
||||
};
|
||||
self.process_agreement(proposer_id, set_agreement_input)?
|
||||
.merge_into(&mut fault_log);
|
||||
Ok(fault_log)
|
||||
}
|
||||
|
||||
/// Callback to be invoked on receipt of the decision value of the Agreement
|
||||
/// instance `uid`.
|
||||
fn process_agreement<F>(&mut self, proposer_id: &NodeUid, f: F) -> CommonSubsetResult<()>
|
||||
fn process_agreement<F>(
|
||||
&mut self,
|
||||
proposer_id: &NodeUid,
|
||||
f: F,
|
||||
) -> CommonSubsetResult<FaultLog<NodeUid>>
|
||||
where
|
||||
F: FnOnce(&mut Agreement<NodeUid>) -> Result<(), agreement::Error>,
|
||||
F: FnOnce(&mut Agreement<NodeUid>) -> AgreementResult<FaultLog<NodeUid>>,
|
||||
{
|
||||
let mut fault_log = FaultLog::new();
|
||||
let value = {
|
||||
let agreement = self
|
||||
.agreement_instances
|
||||
.get_mut(proposer_id)
|
||||
.ok_or(ErrorKind::NoSuchAgreementInstance)?;
|
||||
if agreement.terminated() {
|
||||
return Ok(());
|
||||
return Ok(fault_log);
|
||||
}
|
||||
f(agreement)?;
|
||||
f(agreement)?.merge_into(&mut fault_log);
|
||||
self.messages.extend_agreement(proposer_id, agreement);
|
||||
if let Some(output) = agreement.next_output() {
|
||||
output
|
||||
} else {
|
||||
return Ok(());
|
||||
return Ok(fault_log);
|
||||
}
|
||||
};
|
||||
if self
|
||||
|
@ -283,7 +298,7 @@ impl<NodeUid: Clone + Debug + Ord> CommonSubset<NodeUid> {
|
|||
// input 0 to each instance of BA that has not yet been provided input.
|
||||
for (uid, agreement) in &mut self.agreement_instances {
|
||||
if agreement.accepts_input() {
|
||||
agreement.set_input(false)?;
|
||||
agreement.set_input(false)?.merge_into(&mut fault_log);
|
||||
self.messages.extend_agreement(uid, agreement);
|
||||
if let Some(output) = agreement.next_output() {
|
||||
if self.agreement_results.insert(uid.clone(), output).is_some() {
|
||||
|
@ -294,7 +309,7 @@ impl<NodeUid: Clone + Debug + Ord> CommonSubset<NodeUid> {
|
|||
}
|
||||
}
|
||||
self.try_agreement_completion();
|
||||
Ok(())
|
||||
Ok(fault_log)
|
||||
}
|
||||
|
||||
/// Returns the number of agreement instances that have decided "yes".
|
||||
|
|
|
@ -57,9 +57,10 @@ use clear_on_drop::ClearOnDrop;
|
|||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crypto::{PublicKey, PublicKeySet, SecretKey, Signature};
|
||||
use fault_log::{FaultKind, FaultLog};
|
||||
use honey_badger::{self, Batch as HbBatch, HoneyBadger, Message as HbMessage};
|
||||
use messaging::{DistAlgorithm, NetworkInfo, Target, TargetedMessage};
|
||||
use sync_key_gen::{Accept, Propose, SyncKeyGen};
|
||||
use sync_key_gen::{Accept, Propose, ProposeOutcome, SyncKeyGen};
|
||||
|
||||
type KeyGenOutput = (PublicKeySet, Option<ClearOnDrop<Box<SecretKey>>>);
|
||||
|
||||
|
@ -170,11 +171,11 @@ where
|
|||
}
|
||||
|
||||
/// Creates a new Dynamic Honey Badger instance with an empty buffer.
|
||||
pub fn build(&self) -> Result<DynamicHoneyBadger<Tx, NodeUid>>
|
||||
pub fn build(&self) -> Result<(DynamicHoneyBadger<Tx, NodeUid>, FaultLog<NodeUid>)>
|
||||
where
|
||||
Tx: Serialize + for<'r> Deserialize<'r> + Debug + Hash + Eq,
|
||||
{
|
||||
let honey_badger = HoneyBadger::builder(Rc::new(self.netinfo.clone()))
|
||||
let (honey_badger, fault_log) = HoneyBadger::builder(Rc::new(self.netinfo.clone()))
|
||||
.batch_size(self.batch_size)
|
||||
.max_future_epochs(self.max_future_epochs)
|
||||
.build()?;
|
||||
|
@ -190,7 +191,7 @@ where
|
|||
messages: MessageQueue(VecDeque::new()),
|
||||
output: VecDeque::new(),
|
||||
};
|
||||
Ok(dyn_hb)
|
||||
Ok((dyn_hb, fault_log))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -235,28 +236,33 @@ where
|
|||
type Message = Message<NodeUid>;
|
||||
type Error = Error;
|
||||
|
||||
fn input(&mut self, input: Self::Input) -> Result<()> {
|
||||
fn input(&mut self, input: Self::Input) -> Result<FaultLog<NodeUid>> {
|
||||
// User transactions are forwarded to `HoneyBadger` right away. Internal messages are
|
||||
// in addition signed and broadcast.
|
||||
match input {
|
||||
Input::User(tx) => {
|
||||
self.honey_badger.input(Transaction::User(tx))?;
|
||||
self.process_output()
|
||||
let mut fault_log = self.honey_badger.input(Transaction::User(tx))?;
|
||||
self.process_output()?.merge_into(&mut fault_log);
|
||||
Ok(fault_log)
|
||||
}
|
||||
Input::Change(change) => self.send_transaction(NodeTransaction::Change(change)),
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_message(&mut self, sender_id: &NodeUid, message: Self::Message) -> Result<()> {
|
||||
fn handle_message(
|
||||
&mut self,
|
||||
sender_id: &NodeUid,
|
||||
message: Self::Message,
|
||||
) -> Result<FaultLog<NodeUid>> {
|
||||
let epoch = message.epoch();
|
||||
if epoch < self.start_epoch {
|
||||
return Ok(()); // Obsolete message.
|
||||
return Ok(FaultLog::new()); // Obsolete message.
|
||||
}
|
||||
if epoch > self.start_epoch {
|
||||
// Message cannot be handled yet. Save it for later.
|
||||
let entry = (sender_id.clone(), message);
|
||||
self.incoming_queue.push(entry);
|
||||
return Ok(());
|
||||
return Ok(FaultLog::new());
|
||||
}
|
||||
match message {
|
||||
Message::HoneyBadger(_, hb_msg) => self.handle_honey_badger_message(sender_id, hb_msg),
|
||||
|
@ -297,14 +303,15 @@ where
|
|||
&mut self,
|
||||
sender_id: &NodeUid,
|
||||
message: HbMessage<NodeUid>,
|
||||
) -> Result<()> {
|
||||
) -> Result<FaultLog<NodeUid>> {
|
||||
if !self.netinfo.all_uids().contains(sender_id) {
|
||||
info!("Unknown sender {:?} of message {:?}", sender_id, message);
|
||||
return Err(ErrorKind::UnknownSender.into());
|
||||
}
|
||||
// Handle the message and put the outgoing messages into the queue.
|
||||
self.honey_badger.handle_message(sender_id, message)?;
|
||||
self.process_output()
|
||||
let mut fault_log = self.honey_badger.handle_message(sender_id, message)?;
|
||||
self.process_output()?.merge_into(&mut fault_log);
|
||||
Ok(fault_log)
|
||||
}
|
||||
|
||||
/// Handles a vote or key generation message and tries to commit it as a transaction. These
|
||||
|
@ -314,15 +321,17 @@ where
|
|||
sender_id: &NodeUid,
|
||||
node_tx: NodeTransaction<NodeUid>,
|
||||
sig: Box<Signature>,
|
||||
) -> Result<()> {
|
||||
) -> Result<FaultLog<NodeUid>> {
|
||||
self.verify_signature(sender_id, &*sig, &node_tx)?;
|
||||
let tx = Transaction::Signed(self.start_epoch, sender_id.clone(), node_tx, sig);
|
||||
self.honey_badger.input(tx)?;
|
||||
self.process_output()
|
||||
let mut fault_log = self.honey_badger.input(tx)?;
|
||||
self.process_output()?.merge_into(&mut fault_log);
|
||||
Ok(fault_log)
|
||||
}
|
||||
|
||||
/// Processes all pending batches output by Honey Badger.
|
||||
fn process_output(&mut self) -> Result<()> {
|
||||
fn process_output(&mut self) -> Result<FaultLog<NodeUid>> {
|
||||
let mut fault_log = FaultLog::new();
|
||||
let start_epoch = self.start_epoch;
|
||||
while let Some(hb_batch) = self.honey_badger.next_output() {
|
||||
// Create the batch we output ourselves. It will contain the _user_ transactions of
|
||||
|
@ -342,13 +351,19 @@ where
|
|||
}
|
||||
if !self.verify_signature(&s_id, &sig, &node_tx)? {
|
||||
info!("Invalid signature from {:?} for: {:?}.", s_id, node_tx);
|
||||
let fault_kind = FaultKind::InvalidNodeTransactionSignature;
|
||||
fault_log.append(s_id.clone(), fault_kind);
|
||||
continue;
|
||||
}
|
||||
use self::NodeTransaction::*;
|
||||
match node_tx {
|
||||
Change(change) => self.handle_vote(s_id, change),
|
||||
Propose(propose) => self.handle_propose(&s_id, propose)?,
|
||||
Accept(accept) => self.handle_accept(&s_id, accept)?,
|
||||
Propose(propose) => self
|
||||
.handle_propose(&s_id, propose)?
|
||||
.merge_into(&mut fault_log),
|
||||
Accept(accept) => self
|
||||
.handle_accept(&s_id, accept)?
|
||||
.merge_into(&mut fault_log),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -362,11 +377,13 @@ where
|
|||
ClearOnDrop::new(Box::new(self.netinfo.secret_key().clone()))
|
||||
});
|
||||
// Restart Honey Badger in the next epoch, and inform the user about the change.
|
||||
self.apply_change(&change, pub_key_set, sk, batch.epoch + 1)?;
|
||||
self.apply_change(&change, pub_key_set, sk, batch.epoch + 1)?
|
||||
.merge_into(&mut fault_log);
|
||||
batch.change = ChangeState::Complete(change);
|
||||
} else {
|
||||
// If the majority changed, restart DKG. Inform the user about the current change.
|
||||
self.update_key_gen(batch.epoch + 1)?;
|
||||
self.update_key_gen(batch.epoch + 1)?
|
||||
.merge_into(&mut fault_log);
|
||||
if let Some((_, ref change)) = self.key_gen {
|
||||
batch.change = ChangeState::InProgress(change.clone());
|
||||
}
|
||||
|
@ -379,10 +396,11 @@ where
|
|||
if start_epoch < self.start_epoch {
|
||||
let queue = mem::replace(&mut self.incoming_queue, Vec::new());
|
||||
for (sender_id, msg) in queue {
|
||||
self.handle_message(&sender_id, msg)?;
|
||||
self.handle_message(&sender_id, msg)?
|
||||
.merge_into(&mut fault_log);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
Ok(fault_log)
|
||||
}
|
||||
|
||||
/// Restarts Honey Badger with a new set of nodes, and resets the Key Generation.
|
||||
|
@ -392,7 +410,7 @@ where
|
|||
pub_key_set: PublicKeySet,
|
||||
sk: ClearOnDrop<Box<SecretKey>>,
|
||||
epoch: u64,
|
||||
) -> Result<()> {
|
||||
) -> Result<FaultLog<NodeUid>> {
|
||||
self.votes.clear();
|
||||
self.key_gen = None;
|
||||
let mut all_uids = self.netinfo.all_uids().clone();
|
||||
|
@ -409,15 +427,16 @@ where
|
|||
|
||||
/// If the majority of votes has changed, restarts Key Generation for the set of nodes implied
|
||||
/// by the current change.
|
||||
fn update_key_gen(&mut self, epoch: u64) -> Result<()> {
|
||||
fn update_key_gen(&mut self, epoch: u64) -> Result<FaultLog<NodeUid>> {
|
||||
let mut fault_log = FaultLog::new();
|
||||
let change = match current_majority(&self.votes, &self.netinfo) {
|
||||
None => {
|
||||
self.key_gen = None;
|
||||
return Ok(());
|
||||
return Ok(fault_log);
|
||||
}
|
||||
Some(change) => {
|
||||
if self.key_gen.as_ref().map(|&(_, ref ch)| ch) == Some(change) {
|
||||
return Ok(()); // The change is the same as last epoch. Continue DKG as is.
|
||||
return Ok(fault_log); // The change is the same as last epoch. Continue DKG as is.
|
||||
}
|
||||
change.clone()
|
||||
}
|
||||
|
@ -432,7 +451,7 @@ where
|
|||
info!("{:?} No-op change: {:?}", self.our_id(), change);
|
||||
}
|
||||
if change.candidate().is_some() {
|
||||
self.restart_honey_badger(epoch)?;
|
||||
self.restart_honey_badger(epoch)?.merge_into(&mut fault_log);
|
||||
}
|
||||
// TODO: This needs to be the same as `num_faulty` will be in the _new_
|
||||
// `NetworkInfo` if the change goes through. It would be safer to deduplicate.
|
||||
|
@ -442,19 +461,20 @@ where
|
|||
let (key_gen, propose) = SyncKeyGen::new(&our_uid, sk, pub_keys, threshold);
|
||||
self.key_gen = Some((key_gen, change));
|
||||
if let Some(propose) = propose {
|
||||
self.send_transaction(NodeTransaction::Propose(propose))?;
|
||||
self.send_transaction(NodeTransaction::Propose(propose))?
|
||||
.merge_into(&mut fault_log);
|
||||
}
|
||||
Ok(())
|
||||
Ok(fault_log)
|
||||
}
|
||||
|
||||
/// Starts a new `HoneyBadger` instance and inputs the user transactions from the existing
|
||||
/// one's buffer and pending outputs.
|
||||
fn restart_honey_badger(&mut self, epoch: u64) -> Result<()> {
|
||||
fn restart_honey_badger(&mut self, epoch: u64) -> Result<FaultLog<NodeUid>> {
|
||||
// TODO: Filter out the messages for `epoch` and later.
|
||||
self.messages
|
||||
.extend_with_epoch(self.start_epoch, &mut self.honey_badger);
|
||||
self.start_epoch = epoch;
|
||||
let honey_badger = {
|
||||
let (honey_badger, fault_log) = {
|
||||
let netinfo = Rc::new(self.netinfo.clone());
|
||||
let old_buf = self.honey_badger.drain_buffer();
|
||||
let outputs = (self.honey_badger.output_iter()).flat_map(HbBatch::into_tx_iter);
|
||||
|
@ -465,40 +485,49 @@ where
|
|||
.build_with_transactions(buffer)?
|
||||
};
|
||||
self.honey_badger = honey_badger;
|
||||
Ok(())
|
||||
Ok(fault_log)
|
||||
}
|
||||
|
||||
/// Handles a `Propose` message that was output by Honey Badger.
|
||||
fn handle_propose(&mut self, sender_id: &NodeUid, propose: Propose) -> Result<()> {
|
||||
fn handle_propose(
|
||||
&mut self,
|
||||
sender_id: &NodeUid,
|
||||
propose: Propose,
|
||||
) -> Result<FaultLog<NodeUid>> {
|
||||
let handle = |&mut (ref mut key_gen, _): &mut (SyncKeyGen<NodeUid>, _)| {
|
||||
key_gen.handle_propose(&sender_id, propose)
|
||||
};
|
||||
match self.key_gen.as_mut().and_then(handle) {
|
||||
Some(accept) => self.send_transaction(NodeTransaction::Accept(accept)),
|
||||
None => Ok(()),
|
||||
Some(ProposeOutcome::Valid(accept)) => {
|
||||
self.send_transaction(NodeTransaction::Accept(accept))
|
||||
}
|
||||
Some(ProposeOutcome::Invalid(fault_log)) => Ok(fault_log),
|
||||
None => Ok(FaultLog::new()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Handles an `Accept` message that was output by Honey Badger.
|
||||
fn handle_accept(&mut self, sender_id: &NodeUid, accept: Accept) -> Result<()> {
|
||||
fn handle_accept(&mut self, sender_id: &NodeUid, accept: Accept) -> Result<FaultLog<NodeUid>> {
|
||||
if let Some(&mut (ref mut key_gen, _)) = self.key_gen.as_mut() {
|
||||
key_gen.handle_accept(&sender_id, accept);
|
||||
Ok(key_gen.handle_accept(&sender_id, accept))
|
||||
} else {
|
||||
Ok(FaultLog::new())
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Signs and sends a `NodeTransaction` and also tries to commit it.
|
||||
fn send_transaction(&mut self, node_tx: NodeTransaction<NodeUid>) -> Result<()> {
|
||||
fn send_transaction(&mut self, node_tx: NodeTransaction<NodeUid>) -> Result<FaultLog<NodeUid>> {
|
||||
let sig = self.sign(&node_tx)?;
|
||||
let msg = Message::Signed(self.start_epoch, node_tx.clone(), sig.clone());
|
||||
self.messages.push_back(Target::All.message(msg));
|
||||
if !self.netinfo.is_validator() {
|
||||
return Ok(());
|
||||
return Ok(FaultLog::new());
|
||||
}
|
||||
let our_uid = self.netinfo.our_uid().clone();
|
||||
let hb_tx = Transaction::Signed(self.start_epoch, our_uid, node_tx, sig);
|
||||
self.honey_badger.input(hb_tx)?;
|
||||
self.process_output()
|
||||
let mut fault_log = self.honey_badger.input(hb_tx)?;
|
||||
self.process_output()?.merge_into(&mut fault_log);
|
||||
Ok(fault_log)
|
||||
}
|
||||
|
||||
/// If the current Key Generation process is ready, returns the generated key set.
|
||||
|
|
|
@ -0,0 +1,96 @@
|
|||
//! Functionality for logging faulty node behavior encountered by each
|
||||
//! algorithm.
|
||||
//!
|
||||
//! Each algorithm can propogate their faulty node logs upwards to a
|
||||
//! calling algorithm via `DistAlgorihm`'s `.input()` and
|
||||
//! `.handle_message()` trait methods.
|
||||
|
||||
/// Represents each reason why a node could be considered faulty.
|
||||
#[derive(Clone, Copy, Debug, PartialEq)]
|
||||
pub enum FaultKind {
|
||||
/// `CommonCoin` received a signature share from an unverified sender.
|
||||
UnverifiedSignatureShareSender,
|
||||
/// `HoneyBadger` received a decryption share from an unverified sender.
|
||||
UnverifiedDecryptionShareSender,
|
||||
/// `HoneyBadger` was unable to deserialize a proposer's ciphertext.
|
||||
InvalidCiphertext,
|
||||
/// `HoneyBadger` was unable to decrypt a share received from a proposer.
|
||||
ShareDecryptionFailed,
|
||||
/// `Broadcast` received a `Value` from a node other than the proposer.
|
||||
ReceivedValueFromNonProposer,
|
||||
/// `Broadcast` recevied an Echo message containing an invalid proof.
|
||||
InvalidProof,
|
||||
/// `HoneyBadger` could not deserialize bytes (i.e. a serialized Batch)
|
||||
/// from a given proposer into a vector of transactions.
|
||||
BatchDeserializationFailed,
|
||||
/// `DynamicHoneyBadger` received a node transaction with an invalid
|
||||
/// signature.
|
||||
InvalidNodeTransactionSignature,
|
||||
/// `DynamicHoneyBadger` received a message (Accept, Propose, or Change)
|
||||
/// with an invalid signature.
|
||||
IncorrectPayloadSignature,
|
||||
/// `DynamicHoneyBadger`/`SyncKeyGen` received an invalid Accept message.
|
||||
InvalidAcceptMessage,
|
||||
/// `DynamicHoneyBadger`/`SyncKeyGen` received an invalid Propose message.
|
||||
InvalidProposeMessage,
|
||||
}
|
||||
|
||||
/// A structure representing the context of a faulty node. This structure
|
||||
/// describes which node is faulty (`node_id`) and which faulty behavior
|
||||
/// that the node exhibited ('kind').
|
||||
#[derive(Clone)]
|
||||
pub struct Fault<NodeUid: Clone> {
|
||||
pub node_id: NodeUid,
|
||||
pub kind: FaultKind,
|
||||
}
|
||||
|
||||
impl<NodeUid: Clone> Fault<NodeUid> {
|
||||
pub fn new(node_id: NodeUid, kind: FaultKind) -> Self {
|
||||
Fault { node_id, kind }
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a new `FaultLog` where `self` is the first element in the log
|
||||
/// vector.
|
||||
impl<NodeUid: Clone> Into<FaultLog<NodeUid>> for Fault<NodeUid> {
|
||||
fn into(self) -> FaultLog<NodeUid> {
|
||||
FaultLog(vec![self])
|
||||
}
|
||||
}
|
||||
|
||||
/// A structure used to contain reports of faulty node behavior.
|
||||
#[derive(Clone)]
|
||||
pub struct FaultLog<NodeUid: Clone>(pub Vec<Fault<NodeUid>>);
|
||||
|
||||
impl<NodeUid: Clone> FaultLog<NodeUid> {
|
||||
/// Creates an empty `FaultLog`.
|
||||
pub fn new() -> Self {
|
||||
FaultLog::default()
|
||||
}
|
||||
|
||||
/// Creates a new `FaultLog` initialized with a single log.
|
||||
pub fn init(node_id: NodeUid, kind: FaultKind) -> Self {
|
||||
Fault::new(node_id, kind).into()
|
||||
}
|
||||
|
||||
/// Creates a new `Fault` and pushes it onto the fault log.
|
||||
pub fn append(&mut self, node_id: NodeUid, kind: FaultKind) {
|
||||
self.0.push(Fault::new(node_id, kind));
|
||||
}
|
||||
|
||||
/// Consumes `new_logs`, appending its logs onto the end of `self`.
|
||||
pub fn extend(&mut self, new_logs: FaultLog<NodeUid>) {
|
||||
self.0.extend(new_logs.0);
|
||||
}
|
||||
|
||||
/// Consumes `self`, appending its logs onto the end of `logs`.
|
||||
pub fn merge_into(self, logs: &mut FaultLog<NodeUid>) {
|
||||
logs.extend(self);
|
||||
}
|
||||
}
|
||||
|
||||
impl<NodeUid: Clone> Default for FaultLog<NodeUid> {
|
||||
fn default() -> Self {
|
||||
FaultLog(vec![])
|
||||
}
|
||||
}
|
|
@ -3,6 +3,7 @@ use std::collections::{BTreeMap, BTreeSet, HashSet, VecDeque};
|
|||
use std::fmt::Debug;
|
||||
use std::hash::Hash;
|
||||
use std::marker::PhantomData;
|
||||
use std::ops::Not;
|
||||
use std::rc::Rc;
|
||||
use std::{cmp, iter, mem};
|
||||
|
||||
|
@ -13,6 +14,7 @@ use serde::{Deserialize, Serialize};
|
|||
|
||||
use common_subset::{self, CommonSubset};
|
||||
use crypto::{Ciphertext, DecryptionShare};
|
||||
use fault_log::{FaultKind, FaultLog};
|
||||
use messaging::{DistAlgorithm, NetworkInfo, Target, TargetedMessage};
|
||||
|
||||
error_chain!{
|
||||
|
@ -73,7 +75,7 @@ where
|
|||
}
|
||||
|
||||
/// Creates a new Honey Badger instance with an empty buffer.
|
||||
pub fn build(&self) -> HoneyBadgerResult<HoneyBadger<Tx, NodeUid>>
|
||||
pub fn build(&self) -> HoneyBadgerResult<(HoneyBadger<Tx, NodeUid>, FaultLog<NodeUid>)>
|
||||
where
|
||||
Tx: Serialize + for<'r> Deserialize<'r> + Debug + Hash + Eq,
|
||||
{
|
||||
|
@ -84,7 +86,7 @@ where
|
|||
pub fn build_with_transactions<TI>(
|
||||
&self,
|
||||
txs: TI,
|
||||
) -> HoneyBadgerResult<HoneyBadger<Tx, NodeUid>>
|
||||
) -> HoneyBadgerResult<(HoneyBadger<Tx, NodeUid>, FaultLog<NodeUid>)>
|
||||
where
|
||||
TI: IntoIterator<Item = Tx>,
|
||||
Tx: Serialize + for<'r> Deserialize<'r> + Debug + Hash + Eq,
|
||||
|
@ -104,8 +106,8 @@ where
|
|||
ciphertexts: BTreeMap::new(),
|
||||
};
|
||||
honey_badger.buffer.extend(txs);
|
||||
honey_badger.propose()?;
|
||||
Ok(honey_badger)
|
||||
let fault_log = honey_badger.propose()?;
|
||||
Ok((honey_badger, fault_log))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -153,23 +155,23 @@ where
|
|||
type Message = Message<NodeUid>;
|
||||
type Error = Error;
|
||||
|
||||
fn input(&mut self, input: Self::Input) -> HoneyBadgerResult<()> {
|
||||
fn input(&mut self, input: Self::Input) -> HoneyBadgerResult<FaultLog<NodeUid>> {
|
||||
self.add_transactions(iter::once(input));
|
||||
Ok(())
|
||||
Ok(FaultLog::new())
|
||||
}
|
||||
|
||||
fn handle_message(
|
||||
&mut self,
|
||||
sender_id: &NodeUid,
|
||||
message: Self::Message,
|
||||
) -> HoneyBadgerResult<()> {
|
||||
) -> HoneyBadgerResult<FaultLog<NodeUid>> {
|
||||
if !self.netinfo.all_uids().contains(sender_id) {
|
||||
return Err(ErrorKind::UnknownSender.into());
|
||||
}
|
||||
let Message { epoch, content } = message;
|
||||
if epoch < self.epoch {
|
||||
// Ignore all messages from past epochs.
|
||||
return Ok(());
|
||||
return Ok(FaultLog::new());
|
||||
}
|
||||
if epoch > self.epoch + self.max_future_epochs {
|
||||
// Postpone handling this message.
|
||||
|
@ -177,7 +179,7 @@ where
|
|||
.entry(epoch)
|
||||
.or_insert_with(Vec::new)
|
||||
.push((sender_id.clone(), content));
|
||||
return Ok(());
|
||||
return Ok(FaultLog::new());
|
||||
}
|
||||
self.handle_message_content(sender_id, epoch, content)
|
||||
}
|
||||
|
@ -221,9 +223,9 @@ where
|
|||
}
|
||||
|
||||
/// Proposes a new batch in the current epoch.
|
||||
fn propose(&mut self) -> HoneyBadgerResult<()> {
|
||||
fn propose(&mut self) -> HoneyBadgerResult<FaultLog<NodeUid>> {
|
||||
if !self.netinfo.is_validator() {
|
||||
return Ok(());
|
||||
return Ok(FaultLog::new());
|
||||
}
|
||||
let proposal = self.choose_transactions()?;
|
||||
let cs = match self.common_subsets.entry(self.epoch) {
|
||||
|
@ -233,9 +235,9 @@ where
|
|||
}
|
||||
};
|
||||
let ciphertext = self.netinfo.public_key_set().public_key().encrypt(proposal);
|
||||
cs.input(bincode::serialize(&ciphertext).unwrap())?;
|
||||
let fault_log = cs.input(bincode::serialize(&ciphertext).unwrap())?;
|
||||
self.messages.extend_with_epoch(self.epoch, cs);
|
||||
Ok(())
|
||||
Ok(fault_log)
|
||||
}
|
||||
|
||||
/// Returns a random choice of `batch_size / all_uids.len()` buffered transactions, and
|
||||
|
@ -263,7 +265,7 @@ where
|
|||
sender_id: &NodeUid,
|
||||
epoch: u64,
|
||||
content: MessageContent<NodeUid>,
|
||||
) -> HoneyBadgerResult<()> {
|
||||
) -> HoneyBadgerResult<FaultLog<NodeUid>> {
|
||||
match content {
|
||||
MessageContent::CommonSubset(cs_msg) => {
|
||||
self.handle_common_subset_message(sender_id, epoch, cs_msg)
|
||||
|
@ -280,29 +282,32 @@ where
|
|||
sender_id: &NodeUid,
|
||||
epoch: u64,
|
||||
message: common_subset::Message<NodeUid>,
|
||||
) -> HoneyBadgerResult<()> {
|
||||
) -> HoneyBadgerResult<FaultLog<NodeUid>> {
|
||||
let mut fault_log = FaultLog::new();
|
||||
{
|
||||
// Borrow the instance for `epoch`, or create it.
|
||||
let cs = match self.common_subsets.entry(epoch) {
|
||||
Entry::Occupied(entry) => entry.into_mut(),
|
||||
Entry::Vacant(entry) => {
|
||||
if epoch < self.epoch {
|
||||
return Ok(()); // Epoch has already terminated. Message is obsolete.
|
||||
// Epoch has already terminated. Message is obsolete.
|
||||
return Ok(fault_log);
|
||||
} else {
|
||||
entry.insert(CommonSubset::new(self.netinfo.clone(), epoch)?)
|
||||
}
|
||||
}
|
||||
};
|
||||
// Handle the message and put the outgoing messages into the queue.
|
||||
cs.handle_message(sender_id, message)?;
|
||||
cs.handle_message(sender_id, message)?
|
||||
.merge_into(&mut fault_log);
|
||||
self.messages.extend_with_epoch(epoch, cs);
|
||||
}
|
||||
// If this is the current epoch, the message could cause a new output.
|
||||
if epoch == self.epoch {
|
||||
self.process_output()?;
|
||||
self.process_output()?.merge_into(&mut fault_log);
|
||||
}
|
||||
self.remove_terminated(epoch);
|
||||
Ok(())
|
||||
Ok(fault_log)
|
||||
}
|
||||
|
||||
/// Handles decryption shares sent by `HoneyBadger` instances.
|
||||
|
@ -312,15 +317,18 @@ where
|
|||
epoch: u64,
|
||||
proposer_id: NodeUid,
|
||||
share: DecryptionShare,
|
||||
) -> HoneyBadgerResult<()> {
|
||||
) -> HoneyBadgerResult<FaultLog<NodeUid>> {
|
||||
let mut fault_log = FaultLog::new();
|
||||
|
||||
if let Some(ciphertext) = self
|
||||
.ciphertexts
|
||||
.get(&self.epoch)
|
||||
.and_then(|cts| cts.get(&proposer_id))
|
||||
{
|
||||
if !self.verify_decryption_share(sender_id, &share, ciphertext) {
|
||||
// TODO: Log the incorrect sender.
|
||||
return Ok(());
|
||||
let fault_kind = FaultKind::UnverifiedDecryptionShareSender;
|
||||
fault_log.append(sender_id.clone(), fault_kind);
|
||||
return Ok(fault_log);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -336,10 +344,12 @@ where
|
|||
}
|
||||
|
||||
if epoch == self.epoch && self.try_decrypt_proposer_selection(proposer_id) {
|
||||
self.try_output_batch()?;
|
||||
if let BoolWithFaultLog::True(faults) = self.try_output_batch()? {
|
||||
fault_log.extend(faults);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
Ok(fault_log)
|
||||
}
|
||||
|
||||
/// Verifies a given decryption share using the sender's public key and the proposer's
|
||||
|
@ -360,22 +370,28 @@ where
|
|||
|
||||
/// When selections of transactions have been decrypted for all valid proposers in this epoch,
|
||||
/// moves those transactions into a batch, outputs the batch and updates the epoch.
|
||||
fn try_output_batch(&mut self) -> HoneyBadgerResult<bool> {
|
||||
fn try_output_batch(&mut self) -> HoneyBadgerResult<BoolWithFaultLog<NodeUid>> {
|
||||
// Wait until selections have been successfully decoded for all proposer nodes with correct
|
||||
// ciphertext outputs.
|
||||
if !self.all_selections_decrypted() {
|
||||
return Ok(false);
|
||||
return Ok(BoolWithFaultLog::False);
|
||||
}
|
||||
|
||||
// Deserialize the output.
|
||||
let mut fault_log = FaultLog::new();
|
||||
let transactions: BTreeMap<NodeUid, Vec<Tx>> = self
|
||||
.decrypted_selections
|
||||
.iter()
|
||||
.flat_map(|(proposer_id, ser_batch)| {
|
||||
// If deserialization fails, the proposer of that batch is faulty. Ignore it.
|
||||
bincode::deserialize::<Vec<Tx>>(&ser_batch)
|
||||
.ok()
|
||||
.map(|proposed| (proposer_id.clone(), proposed))
|
||||
// If deserialization fails, the proposer of that batch is
|
||||
// faulty. Log the faulty proposer and ignore the batch.
|
||||
if let Ok(proposed) = bincode::deserialize::<Vec<Tx>>(&ser_batch) {
|
||||
Some((proposer_id.clone(), proposed))
|
||||
} else {
|
||||
let fault_kind = FaultKind::BatchDeserializationFailed;
|
||||
fault_log.append(proposer_id.clone(), fault_kind);
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
let batch = Batch {
|
||||
|
@ -395,35 +411,40 @@ where
|
|||
);
|
||||
// Queue the output and advance the epoch.
|
||||
self.output.push_back(batch);
|
||||
self.update_epoch()?;
|
||||
Ok(true)
|
||||
self.update_epoch()?.merge_into(&mut fault_log);
|
||||
Ok(BoolWithFaultLog::True(fault_log))
|
||||
}
|
||||
|
||||
/// Increments the epoch number and clears any state that is local to the finished epoch.
|
||||
fn update_epoch(&mut self) -> HoneyBadgerResult<()> {
|
||||
fn update_epoch(&mut self) -> HoneyBadgerResult<FaultLog<NodeUid>> {
|
||||
// Clear the state of the old epoch.
|
||||
self.ciphertexts.remove(&self.epoch);
|
||||
self.decrypted_selections.clear();
|
||||
self.received_shares.remove(&self.epoch);
|
||||
self.epoch += 1;
|
||||
let max_epoch = self.epoch + self.max_future_epochs;
|
||||
let mut fault_log = FaultLog::new();
|
||||
// TODO: Once stable, use `Iterator::flatten`.
|
||||
for (sender_id, content) in
|
||||
Itertools::flatten(self.incoming_queue.remove(&max_epoch).into_iter())
|
||||
{
|
||||
self.handle_message_content(&sender_id, max_epoch, content)?;
|
||||
self.handle_message_content(&sender_id, max_epoch, content)?
|
||||
.merge_into(&mut fault_log);
|
||||
}
|
||||
// Handle any decryption shares received for the new epoch.
|
||||
if !self.try_decrypt_and_output_batch()? {
|
||||
// Continue with this epoch if a batch is not output by `try_decrypt_and_output_batch`.
|
||||
self.propose()?;
|
||||
if let BoolWithFaultLog::True(faults) = self.try_decrypt_and_output_batch()? {
|
||||
fault_log.extend(faults);
|
||||
} else {
|
||||
// Continue with this epoch if a batch is not output by
|
||||
// `try_decrypt_and_output_batch`.
|
||||
self.propose()?.merge_into(&mut fault_log);
|
||||
}
|
||||
Ok(())
|
||||
Ok(fault_log)
|
||||
}
|
||||
|
||||
/// Tries to decrypt transaction selections from all proposers and output those transactions in
|
||||
/// a batch.
|
||||
fn try_decrypt_and_output_batch(&mut self) -> HoneyBadgerResult<bool> {
|
||||
fn try_decrypt_and_output_batch(&mut self) -> HoneyBadgerResult<BoolWithFaultLog<NodeUid>> {
|
||||
if let Some(proposer_ids) = self
|
||||
.received_shares
|
||||
.get(&self.epoch)
|
||||
|
@ -438,10 +459,10 @@ where
|
|||
{
|
||||
self.try_output_batch()
|
||||
} else {
|
||||
Ok(false)
|
||||
Ok(BoolWithFaultLog::False)
|
||||
}
|
||||
} else {
|
||||
Ok(false)
|
||||
Ok(BoolWithFaultLog::False)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -495,23 +516,27 @@ where
|
|||
fn send_decryption_shares(
|
||||
&mut self,
|
||||
cs_output: BTreeMap<NodeUid, Vec<u8>>,
|
||||
) -> Result<(), Error> {
|
||||
) -> HoneyBadgerResult<FaultLog<NodeUid>> {
|
||||
let mut fault_log = FaultLog::new();
|
||||
for (proposer_id, v) in cs_output {
|
||||
let mut ciphertext: Ciphertext;
|
||||
if let Ok(ct) = bincode::deserialize(&v) {
|
||||
ciphertext = ct;
|
||||
} else {
|
||||
warn!("Invalid ciphertext from proposer {:?} ignored", proposer_id);
|
||||
// TODO: Log the incorrect node `j`.
|
||||
let fault_kind = FaultKind::InvalidCiphertext;
|
||||
fault_log.append(proposer_id.clone(), fault_kind);
|
||||
continue;
|
||||
}
|
||||
let incorrect_senders =
|
||||
let (incorrect_senders, faults) =
|
||||
self.verify_pending_decryption_shares(&proposer_id, &ciphertext);
|
||||
self.remove_incorrect_decryption_shares(&proposer_id, incorrect_senders);
|
||||
fault_log.extend(faults);
|
||||
|
||||
if !self.send_decryption_share(&proposer_id, &ciphertext)? {
|
||||
warn!("Share decryption failed for proposer {:?}", proposer_id);
|
||||
// TODO: Log the decryption failure.
|
||||
let fault_kind = FaultKind::ShareDecryptionFailed;
|
||||
fault_log.append(proposer_id.clone(), fault_kind);
|
||||
continue;
|
||||
}
|
||||
let ciphertexts = self
|
||||
|
@ -520,7 +545,7 @@ where
|
|||
.or_insert_with(BTreeMap::new);
|
||||
ciphertexts.insert(proposer_id, ciphertext);
|
||||
}
|
||||
Ok(())
|
||||
Ok(fault_log)
|
||||
}
|
||||
|
||||
/// Verifies the ciphertext and sends decryption shares. Returns whether it is valid.
|
||||
|
@ -528,12 +553,12 @@ where
|
|||
&mut self,
|
||||
proposer_id: &NodeUid,
|
||||
ciphertext: &Ciphertext,
|
||||
) -> HoneyBadgerResult<bool> {
|
||||
) -> HoneyBadgerResult<BoolWithFaultLog<NodeUid>> {
|
||||
if !self.netinfo.is_validator() {
|
||||
return Ok(ciphertext.verify());
|
||||
return Ok(ciphertext.verify().into());
|
||||
}
|
||||
let share = match self.netinfo.secret_key().decrypt_share(&ciphertext) {
|
||||
None => return Ok(false),
|
||||
None => return Ok(BoolWithFaultLog::False),
|
||||
Some(share) => share,
|
||||
};
|
||||
// Send the share to remote nodes.
|
||||
|
@ -546,8 +571,8 @@ where
|
|||
let our_id = self.netinfo.our_uid().clone();
|
||||
let epoch = self.epoch;
|
||||
// Receive the share locally.
|
||||
self.handle_decryption_share_message(&our_id, epoch, proposer_id.clone(), share)?;
|
||||
Ok(true)
|
||||
self.handle_decryption_share_message(&our_id, epoch, proposer_id.clone(), share)
|
||||
.map(|fault_log| fault_log.into())
|
||||
}
|
||||
|
||||
/// Verifies the shares of the current epoch that are pending verification. Returned are the
|
||||
|
@ -556,8 +581,9 @@ where
|
|||
&self,
|
||||
proposer_id: &NodeUid,
|
||||
ciphertext: &Ciphertext,
|
||||
) -> BTreeSet<NodeUid> {
|
||||
) -> (BTreeSet<NodeUid>, FaultLog<NodeUid>) {
|
||||
let mut incorrect_senders = BTreeSet::new();
|
||||
let mut fault_log = FaultLog::new();
|
||||
if let Some(sender_shares) = self
|
||||
.received_shares
|
||||
.get(&self.epoch)
|
||||
|
@ -565,11 +591,13 @@ where
|
|||
{
|
||||
for (sender_id, share) in sender_shares {
|
||||
if !self.verify_decryption_share(sender_id, share, ciphertext) {
|
||||
let fault_kind = FaultKind::UnverifiedDecryptionShareSender;
|
||||
fault_log.append(sender_id.clone(), fault_kind);
|
||||
incorrect_senders.insert(sender_id.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
incorrect_senders
|
||||
(incorrect_senders, fault_log)
|
||||
}
|
||||
|
||||
fn remove_incorrect_decryption_shares(
|
||||
|
@ -589,12 +617,14 @@ where
|
|||
}
|
||||
|
||||
/// Checks whether the current epoch has output, and if it does, sends out our decryption shares.
|
||||
fn process_output(&mut self) -> Result<(), Error> {
|
||||
fn process_output(&mut self) -> HoneyBadgerResult<FaultLog<NodeUid>> {
|
||||
let mut fault_log = FaultLog::new();
|
||||
if let Some(cs_output) = self.take_current_output() {
|
||||
self.send_decryption_shares(cs_output)?;
|
||||
self.send_decryption_shares(cs_output)?
|
||||
.merge_into(&mut fault_log);
|
||||
// TODO: May also check that there is no further output from Common Subset.
|
||||
}
|
||||
Ok(())
|
||||
Ok(fault_log)
|
||||
}
|
||||
|
||||
/// Returns the output of the current epoch's `CommonSubset` instance, if any.
|
||||
|
@ -701,3 +731,37 @@ impl<NodeUid: Clone + Debug + Ord> MessageQueue<NodeUid> {
|
|||
self.extend(cs.message_iter().map(convert));
|
||||
}
|
||||
}
|
||||
|
||||
// The return type for `HoneyBadger` methods that return a boolean and a
|
||||
// fault log.
|
||||
enum BoolWithFaultLog<NodeUid: Clone> {
|
||||
True(FaultLog<NodeUid>),
|
||||
False,
|
||||
}
|
||||
|
||||
impl<NodeUid: Clone> Into<BoolWithFaultLog<NodeUid>> for bool {
|
||||
fn into(self) -> BoolWithFaultLog<NodeUid> {
|
||||
if self {
|
||||
BoolWithFaultLog::True(FaultLog::new())
|
||||
} else {
|
||||
BoolWithFaultLog::False
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<NodeUid: Clone> Into<BoolWithFaultLog<NodeUid>> for FaultLog<NodeUid> {
|
||||
fn into(self) -> BoolWithFaultLog<NodeUid> {
|
||||
BoolWithFaultLog::True(self)
|
||||
}
|
||||
}
|
||||
|
||||
impl<NodeUid: Clone> Not for BoolWithFaultLog<NodeUid> {
|
||||
type Output = bool;
|
||||
|
||||
fn not(self) -> Self::Output {
|
||||
match self {
|
||||
BoolWithFaultLog::False => true,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -126,6 +126,7 @@ pub mod common_coin;
|
|||
pub mod common_subset;
|
||||
pub mod crypto;
|
||||
pub mod dynamic_honey_badger;
|
||||
pub mod fault_log;
|
||||
mod fmt;
|
||||
pub mod honey_badger;
|
||||
pub mod messaging;
|
||||
|
|
|
@ -4,6 +4,7 @@ use std::fmt::Debug;
|
|||
use clear_on_drop::ClearOnDrop;
|
||||
|
||||
use crypto::{PublicKey, PublicKeySet, SecretKey};
|
||||
use fault_log::FaultLog;
|
||||
|
||||
/// Message sent by a given source.
|
||||
#[derive(Clone, Debug)]
|
||||
|
@ -65,14 +66,14 @@ pub trait DistAlgorithm {
|
|||
type Error: Debug;
|
||||
|
||||
/// Handles an input provided by the user, and returns
|
||||
fn input(&mut self, input: Self::Input) -> Result<(), Self::Error>;
|
||||
fn input(&mut self, input: Self::Input) -> Result<FaultLog<Self::NodeUid>, Self::Error>;
|
||||
|
||||
/// Handles a message received from node `sender_id`.
|
||||
fn handle_message(
|
||||
&mut self,
|
||||
sender_id: &Self::NodeUid,
|
||||
message: Self::Message,
|
||||
) -> Result<(), Self::Error>;
|
||||
) -> Result<FaultLog<Self::NodeUid>, Self::Error>;
|
||||
|
||||
/// Returns a message that needs to be sent to another node.
|
||||
fn next_message(&mut self) -> Option<TargetedMessage<Self::Message, Self::NodeUid>>;
|
||||
|
|
|
@ -45,6 +45,7 @@ use rand::OsRng;
|
|||
use crypto::poly::{BivarCommitment, BivarPoly, Poly};
|
||||
use crypto::serde_impl::field_vec::FieldWrap;
|
||||
use crypto::{Ciphertext, PublicKey, PublicKeySet, SecretKey};
|
||||
use fault_log::{FaultKind, FaultLog};
|
||||
|
||||
// TODO: No need to send our own row and value to ourselves.
|
||||
|
||||
|
@ -97,6 +98,16 @@ impl ProposalState {
|
|||
}
|
||||
}
|
||||
|
||||
/// Returned from `SyncKeyGen.handle_propose()`.
|
||||
pub enum ProposeOutcome<NodeUid: Clone> {
|
||||
// If the Propose message passed to `handle_propose()` is valid, an
|
||||
// Accept message is returned.
|
||||
Valid(Accept),
|
||||
// If the Propose message passed to `handle_propose()` is invalid, the
|
||||
// fault is logged and passed onto the caller.
|
||||
Invalid(FaultLog<NodeUid>),
|
||||
}
|
||||
|
||||
/// A synchronous algorithm for dealerless distributed key generation.
|
||||
///
|
||||
/// It requires that all nodes handle all messages in the exact same order.
|
||||
|
@ -113,7 +124,7 @@ pub struct SyncKeyGen<NodeUid> {
|
|||
threshold: usize,
|
||||
}
|
||||
|
||||
impl<NodeUid: Ord + Debug> SyncKeyGen<NodeUid> {
|
||||
impl<NodeUid: Ord + Clone + Debug> SyncKeyGen<NodeUid> {
|
||||
/// Creates a new `SyncKeyGen` instance, together with the `Propose` message that should be
|
||||
/// broadcast, if we are a peer.
|
||||
pub fn new(
|
||||
|
@ -153,7 +164,7 @@ impl<NodeUid: Ord + Debug> SyncKeyGen<NodeUid> {
|
|||
&mut self,
|
||||
sender_id: &NodeUid,
|
||||
Propose(commit, rows): Propose,
|
||||
) -> Option<Accept> {
|
||||
) -> Option<ProposeOutcome<NodeUid>> {
|
||||
let sender_idx = self.node_index(sender_id)?;
|
||||
let opt_commit_row = self.our_idx.map(|idx| commit.row(idx + 1));
|
||||
match self.proposals.entry(sender_idx) {
|
||||
|
@ -166,10 +177,17 @@ impl<NodeUid: Ord + Debug> SyncKeyGen<NodeUid> {
|
|||
let our_idx = self.our_idx?;
|
||||
let commit_row = opt_commit_row?;
|
||||
let ser_row = self.sec_key.decrypt(rows.get(our_idx as usize)?)?;
|
||||
let row: Poly = bincode::deserialize(&ser_row).ok()?; // Ignore invalid messages.
|
||||
let row: Poly = if let Ok(row) = bincode::deserialize(&ser_row) {
|
||||
row
|
||||
} else {
|
||||
// Log the faulty node and ignore invalid messages.
|
||||
let fault_log = FaultLog::init(sender_id.clone(), FaultKind::InvalidProposeMessage);
|
||||
return Some(ProposeOutcome::Invalid(fault_log));
|
||||
};
|
||||
if row.commitment() != commit_row {
|
||||
debug!("Invalid proposal from node {}.", sender_idx);
|
||||
return None;
|
||||
let fault_log = FaultLog::init(sender_id.clone(), FaultKind::InvalidProposeMessage);
|
||||
return Some(ProposeOutcome::Invalid(fault_log));
|
||||
}
|
||||
// The row is valid: now encrypt one value for each node.
|
||||
let encrypt = |(idx, pk): (usize, &PublicKey)| {
|
||||
|
@ -180,16 +198,19 @@ impl<NodeUid: Ord + Debug> SyncKeyGen<NodeUid> {
|
|||
pk.encrypt(ser_val)
|
||||
};
|
||||
let values = self.pub_keys.values().enumerate().map(encrypt).collect();
|
||||
Some(Accept(sender_idx, values))
|
||||
Some(ProposeOutcome::Valid(Accept(sender_idx, values)))
|
||||
}
|
||||
|
||||
/// Handles an `Accept` message.
|
||||
pub fn handle_accept(&mut self, sender_id: &NodeUid, accept: Accept) {
|
||||
pub fn handle_accept(&mut self, sender_id: &NodeUid, accept: Accept) -> FaultLog<NodeUid> {
|
||||
let mut fault_log = FaultLog::new();
|
||||
if let Some(sender_idx) = self.node_index(sender_id) {
|
||||
if let Err(err) = self.handle_accept_or_err(sender_idx, accept) {
|
||||
debug!("Invalid accept from node {}: {}", sender_idx, err);
|
||||
fault_log.append(sender_id.clone(), FaultKind::InvalidAcceptMessage);
|
||||
}
|
||||
}
|
||||
fault_log
|
||||
}
|
||||
|
||||
/// Returns the number of complete proposals. If this is at least `threshold + 1`, the keys can
|
||||
|
|
|
@ -112,6 +112,7 @@ fn new_dynamic_hb(netinfo: Rc<NetworkInfo<NodeUid>>) -> DynamicHoneyBadger<usize
|
|||
.batch_size(12)
|
||||
.build()
|
||||
.expect("Instantiate dynamic_honey_badger")
|
||||
.0
|
||||
}
|
||||
|
||||
fn test_dynamic_honey_badger_different_sizes<A, F>(new_adversary: F, num_txs: usize)
|
||||
|
|
|
@ -185,6 +185,7 @@ fn new_honey_badger(netinfo: Rc<NetworkInfo<NodeUid>>) -> HoneyBadger<usize, Nod
|
|||
.batch_size(12)
|
||||
.build_with_transactions(0..5)
|
||||
.expect("Instantiate honey_badger")
|
||||
.0
|
||||
}
|
||||
|
||||
fn test_honey_badger_different_sizes<A, F>(new_adversary: F, num_txs: usize)
|
||||
|
|
|
@ -8,7 +8,7 @@ extern crate rand;
|
|||
use std::collections::BTreeMap;
|
||||
|
||||
use hbbft::crypto::{PublicKey, SecretKey};
|
||||
use hbbft::sync_key_gen::SyncKeyGen;
|
||||
use hbbft::sync_key_gen::{ProposeOutcome, SyncKeyGen};
|
||||
|
||||
fn test_sync_key_gen_with(threshold: usize, node_num: usize) {
|
||||
// Generate individual key pairs for encryption. These are not suitable for threshold schemes.
|
||||
|
@ -35,9 +35,11 @@ fn test_sync_key_gen_with(threshold: usize, node_num: usize) {
|
|||
let mut accepts = Vec::new();
|
||||
for (sender_id, proposal) in proposals[..=threshold].iter().enumerate() {
|
||||
for (node_id, node) in nodes.iter_mut().enumerate() {
|
||||
let accept = node
|
||||
.handle_propose(&sender_id, proposal.clone().expect("proposal"))
|
||||
.expect("valid proposal");
|
||||
let proposal = proposal.clone().expect("proposal");
|
||||
let accept = match node.handle_propose(&sender_id, proposal) {
|
||||
Some(ProposeOutcome::Valid(accept)) => accept,
|
||||
_ => panic!("invalid proposal"),
|
||||
};
|
||||
// Only the first `threshold + 1` manage to commit their `Accept`s.
|
||||
if node_id <= 2 * threshold {
|
||||
accepts.push((node_id, accept));
|
||||
|
|
Loading…
Reference in New Issue