Merge pull request #298 from poanetwork/afck-api

Make the BA session ID generic.
This commit is contained in:
Vladimir Komendantskiy 2018-10-29 09:57:07 +00:00 committed by GitHub
commit 5eeb06aac8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 189 additions and 141 deletions

View File

@ -1,12 +1,16 @@
use std::collections::BTreeMap;
use std::fmt::{self, Display};
use std::result;
use std::sync::Arc;
use bincode;
use super::bool_multimap::BoolMultimap;
use super::bool_set::BoolSet;
use super::sbv_broadcast::{self, SbvBroadcast};
use super::{Error, Message, MessageContent, Nonce, Result, Step};
use super::{Error, Message, MessageContent, Result, Step};
use threshold_sign::{self, ThresholdSign};
use {DistAlgorithm, NetworkInfo, NodeIdT, Target};
use {DistAlgorithm, NetworkInfo, NodeIdT, SessionIdT, Target};
/// The state of the current epoch's coin. In some epochs this is fixed, in others it starts
/// with in `InProgress`.
@ -36,13 +40,11 @@ impl<N> From<bool> for CoinState<N> {
/// Binary Agreement instance
#[derive(Debug)]
pub struct BinaryAgreement<N> {
pub struct BinaryAgreement<N, S> {
/// Shared network information.
netinfo: Arc<NetworkInfo<N>>,
/// Session ID, e.g, the Honey Badger algorithm epoch.
session_id: u64,
/// The ID of the proposer of the value for this Binary Agreement instance.
proposer_id: N,
/// Session identifier, to prevent replaying messages in other instances.
session_id: S,
/// Binary Agreement algorithm epoch.
epoch: u32,
/// This epoch's Synchronized Binary Value Broadcast instance.
@ -69,19 +71,19 @@ pub struct BinaryAgreement<N> {
coin_state: CoinState<N>,
}
impl<N: NodeIdT> DistAlgorithm for BinaryAgreement<N> {
impl<N: NodeIdT, S: SessionIdT> DistAlgorithm for BinaryAgreement<N, S> {
type NodeId = N;
type Input = bool;
type Output = bool;
type Message = Message;
type Error = Error;
fn handle_input(&mut self, input: Self::Input) -> Result<Step<N>> {
fn handle_input(&mut self, input: Self::Input) -> Result<Step<N, S>> {
self.propose(input)
}
/// Receive input from a remote node.
fn handle_message(&mut self, sender_id: &Self::NodeId, msg: Message) -> Result<Step<N>> {
fn handle_message(&mut self, sender_id: &Self::NodeId, msg: Message) -> Result<Step<N, S>> {
self.handle_message(sender_id, msg)
}
@ -95,19 +97,13 @@ impl<N: NodeIdT> DistAlgorithm for BinaryAgreement<N> {
}
}
impl<N: NodeIdT> BinaryAgreement<N> {
/// Creates a new `BinaryAgreement` instance. The `session_id` and `proposer_id` are used to
/// uniquely identify this instance: its messages cannot be replayed in an instance with
/// different values.
// TODO: Use a generic type argument for that instead of something `Subset`-specific.
pub fn new(netinfo: Arc<NetworkInfo<N>>, session_id: u64, proposer_id: N) -> Result<Self> {
if !netinfo.is_node_validator(&proposer_id) {
return Err(Error::UnknownProposer);
}
impl<N: NodeIdT, S: SessionIdT> BinaryAgreement<N, S> {
/// Creates a new `BinaryAgreement` instance with the given session identifier, to prevent
/// replaying messages in other instances.
pub fn new(netinfo: Arc<NetworkInfo<N>>, session_id: S) -> Result<Self> {
Ok(BinaryAgreement {
netinfo: netinfo.clone(),
session_id,
proposer_id,
epoch: 0,
sbv_broadcast: SbvBroadcast::new(netinfo),
received_conf: BTreeMap::new(),
@ -126,13 +122,13 @@ impl<N: NodeIdT> BinaryAgreement<N> {
/// output. Otherwise either output is possible.
///
/// Note that if `can_propose` returns `false`, it is already too late to affect the outcome.
pub fn propose(&mut self, input: bool) -> Result<Step<N>> {
pub fn propose(&mut self, input: bool) -> Result<Step<N, S>> {
if !self.can_propose() {
return Ok(Step::default());
}
// Set the initial estimated value to the input value.
self.estimated = Some(input);
debug!("{:?}/{:?} Input {}", self.our_id(), self.proposer_id, input);
debug!("{}: Input {}", self, input);
let sbvb_step = self.sbv_broadcast.handle_input(input)?;
self.handle_sbvb_step(sbvb_step)
}
@ -140,7 +136,7 @@ impl<N: NodeIdT> BinaryAgreement<N> {
/// Handles a message received from `sender_id`.
///
/// This must be called with every message we receive from another node.
pub fn handle_message(&mut self, sender_id: &N, msg: Message) -> Result<Step<N>> {
pub fn handle_message(&mut self, sender_id: &N, msg: Message) -> Result<Step<N, S>> {
let Message { epoch, content } = msg;
if self.decision.is_some() || (epoch < self.epoch && content.can_expire()) {
// Message is obsolete: We are already in a later epoch or terminated.
@ -166,7 +162,7 @@ impl<N: NodeIdT> BinaryAgreement<N> {
&mut self,
sender_id: &N,
content: MessageContent,
) -> Result<Step<N>> {
) -> Result<Step<N, S>> {
match content {
MessageContent::SbvBroadcast(msg) => self.handle_sbv_broadcast(sender_id, msg),
MessageContent::Conf(v) => self.handle_conf(sender_id, v),
@ -180,14 +176,14 @@ impl<N: NodeIdT> BinaryAgreement<N> {
&mut self,
sender_id: &N,
msg: sbv_broadcast::Message,
) -> Result<Step<N>> {
) -> Result<Step<N, S>> {
let sbvb_step = self.sbv_broadcast.handle_message(sender_id, msg)?;
self.handle_sbvb_step(sbvb_step)
}
/// Handles a Synchronized Binary Value Broadcast step. On output, starts the `Conf` round or
/// decides.
fn handle_sbvb_step(&mut self, sbvb_step: sbv_broadcast::Step<N>) -> Result<Step<N>> {
fn handle_sbvb_step(&mut self, sbvb_step: sbv_broadcast::Step<N>) -> Result<Step<N, S>> {
let mut step = Step::default();
let output = step.extend_with(sbvb_step, |msg| {
MessageContent::SbvBroadcast(msg).with_epoch(self.epoch)
@ -213,7 +209,7 @@ impl<N: NodeIdT> BinaryAgreement<N> {
/// Handles a `Conf` message. When _N - f_ `Conf` messages with values in `bin_values` have
/// been received, updates the epoch or decides.
fn handle_conf(&mut self, sender_id: &N, v: BoolSet) -> Result<Step<N>> {
fn handle_conf(&mut self, sender_id: &N, v: BoolSet) -> Result<Step<N, S>> {
self.received_conf.insert(sender_id.clone(), v);
self.try_finish_conf_round()
}
@ -221,7 +217,7 @@ impl<N: NodeIdT> BinaryAgreement<N> {
/// Handles a `Term(v)` message. If we haven't yet decided on a value and there are more than
/// _f_ such messages with the same value from different nodes, performs expedite termination:
/// decides on `v`, broadcasts `Term(v)` and terminates the instance.
fn handle_term(&mut self, sender_id: &N, b: bool) -> Result<Step<N>> {
fn handle_term(&mut self, sender_id: &N, b: bool) -> Result<Step<N, S>> {
self.received_term[b].insert(sender_id.clone());
// Check for the expedite termination condition.
if self.decision.is_some() {
@ -239,7 +235,7 @@ impl<N: NodeIdT> BinaryAgreement<N> {
/// Handles a `ThresholdSign` message. If there is output, starts the next epoch. The function
/// may output a decision value.
fn handle_coin(&mut self, sender_id: &N, msg: threshold_sign::Message) -> Result<Step<N>> {
fn handle_coin(&mut self, sender_id: &N, msg: threshold_sign::Message) -> Result<Step<N, S>> {
let ts_step = match self.coin_state {
CoinState::Decided(_) => return Ok(Step::default()), // Coin value is already decided.
CoinState::InProgress(ref mut ts) => ts
@ -250,7 +246,7 @@ impl<N: NodeIdT> BinaryAgreement<N> {
}
/// Multicasts a `Conf(values)` message, and handles it.
fn send_conf(&mut self, values: BoolSet) -> Result<Step<N>> {
fn send_conf(&mut self, values: BoolSet) -> Result<Step<N, S>> {
if self.conf_values.is_some() {
// Only one `Conf` message is allowed in an epoch.
return Ok(Step::default());
@ -267,11 +263,11 @@ impl<N: NodeIdT> BinaryAgreement<N> {
}
/// Multicasts and handles a message. Does nothing if we are only an observer.
fn send(&mut self, content: MessageContent) -> Result<Step<N>> {
fn send(&mut self, content: MessageContent) -> Result<Step<N, S>> {
if !self.netinfo.is_validator() {
return Ok(Step::default());
}
let step: Step<_> = Target::All
let step: Step<N, S> = Target::All
.message(content.clone().with_epoch(self.epoch))
.into();
let our_id = &self.our_id().clone();
@ -279,7 +275,7 @@ impl<N: NodeIdT> BinaryAgreement<N> {
}
/// Handles a step returned from the `ThresholdSign`.
fn on_coin_step(&mut self, ts_step: threshold_sign::Step<N>) -> Result<Step<N>> {
fn on_coin_step(&mut self, ts_step: threshold_sign::Step<N>) -> Result<Step<N, S>> {
let mut step = Step::default();
let epoch = self.epoch;
let to_msg = |c_msg| MessageContent::Coin(Box::new(c_msg)).with_epoch(epoch);
@ -298,7 +294,7 @@ impl<N: NodeIdT> BinaryAgreement<N> {
/// With two conf values, the next epoch's estimate is the coin value. If there is only one conf
/// value and that disagrees with the coin, the conf value is the next epoch's estimate. If
/// the unique conf value agrees with the coin, terminates and decides on that value.
fn try_update_epoch(&mut self) -> Result<Step<N>> {
fn try_update_epoch(&mut self) -> Result<Step<N, S>> {
if self.decision.is_some() {
// Avoid an infinite regression without making a Binary Agreement step.
return Ok(Step::default());
@ -321,24 +317,19 @@ impl<N: NodeIdT> BinaryAgreement<N> {
/// Creates the initial coin state for the current epoch, i.e. sets it to the predetermined
/// value, or initializes a `ThresholdSign` instance.
fn coin_state(&self) -> CoinState<N> {
match self.epoch % 3 {
fn coin_state(&self) -> Result<CoinState<N>> {
Ok(match self.epoch % 3 {
0 => CoinState::Decided(true),
1 => CoinState::Decided(false),
_ => {
let nonce = Nonce::new(
self.netinfo.invocation_id().as_ref(),
self.session_id,
self.netinfo.node_index(&self.proposer_id).unwrap(),
self.epoch,
);
CoinState::InProgress(Box::new(ThresholdSign::new(self.netinfo.clone(), nonce)))
let coin_id = bincode::serialize(&(&self.session_id, self.epoch))?;
CoinState::InProgress(Box::new(ThresholdSign::new(self.netinfo.clone(), coin_id)))
}
}
})
}
/// Decides on a value and broadcasts a `Term` message with that value.
fn decide(&mut self, b: bool) -> Step<N> {
fn decide(&mut self, b: bool) -> Step<N, S> {
if self.decision.is_some() {
return Step::default();
}
@ -347,13 +338,7 @@ impl<N: NodeIdT> BinaryAgreement<N> {
step.output.push(b);
// Latch the decided state.
self.decision = Some(b);
debug!(
"{:?}/{:?} (is_validator: {}) decision: {}",
self.our_id(),
self.proposer_id,
self.netinfo.is_validator(),
b
);
debug!("{}: decision: {}", self, b);
if self.netinfo.is_validator() {
let msg = MessageContent::Term(b).with_epoch(self.epoch + 1);
step.messages.push(Target::All.message(msg));
@ -362,7 +347,7 @@ impl<N: NodeIdT> BinaryAgreement<N> {
}
/// Checks whether the _N - f_ `Conf` messages have arrived, and if so, activates the coin.
fn try_finish_conf_round(&mut self) -> Result<Step<N>> {
fn try_finish_conf_round(&mut self) -> Result<Step<N, S>> {
if self.conf_values.is_none() || self.count_conf() < self.netinfo.num_correct() {
return Ok(Step::default());
}
@ -382,7 +367,7 @@ impl<N: NodeIdT> BinaryAgreement<N> {
}
/// Increments the epoch, sets the new estimate and handles queued messages.
fn update_epoch(&mut self, b: bool) -> Result<Step<N>> {
fn update_epoch(&mut self, b: bool) -> Result<Step<N, S>> {
self.sbv_broadcast.clear(&self.received_term);
self.received_conf.clear();
for (v, id) in &self.received_term {
@ -390,12 +375,10 @@ impl<N: NodeIdT> BinaryAgreement<N> {
}
self.conf_values = None;
self.epoch += 1;
self.coin_state = self.coin_state();
self.coin_state = self.coin_state()?;
debug!(
"{:?} BinaryAgreement instance {:?} started epoch {}, {} terminated",
self.our_id(),
self.proposer_id,
self.epoch,
"{}: epoch started, {} terminated",
self,
self.received_conf.len(),
);
@ -416,3 +399,20 @@ impl<N: NodeIdT> BinaryAgreement<N> {
Ok(step)
}
}
impl<N: NodeIdT, S: SessionIdT> Display for BinaryAgreement<N, S> {
fn fmt(&self, f: &mut fmt::Formatter) -> result::Result<(), fmt::Error> {
write!(
f,
"{:?} BA {} epoch {} ({})",
self.our_id(),
self.session_id,
self.epoch,
if self.netinfo.is_validator() {
"validator"
} else {
"observer"
}
)
}
}

View File

@ -68,6 +68,7 @@ mod bool_multimap;
pub mod bool_set;
mod sbv_broadcast;
use bincode;
use rand;
use self::bool_set::BoolSet;
@ -82,14 +83,23 @@ pub enum Error {
HandleThresholdSign(threshold_sign::Error),
#[fail(display = "Error invoking the common coin: {}", _0)]
InvokeCoin(threshold_sign::Error),
#[fail(display = "Unknown proposer")]
UnknownProposer,
// Strings because `io` and `bincode` errors lack `Eq` and `Clone`.
#[fail(display = "Error writing epoch for nonce: {}", _0)]
Io(String),
#[fail(display = "Error serializing session ID for nonce: {}", _0)]
Serialize(String),
}
impl From<bincode::Error> for Error {
fn from(err: bincode::Error) -> Error {
Error::Io(format!("{:?}", err))
}
}
/// An Binary Agreement result.
pub type Result<T> = ::std::result::Result<T, Error>;
pub type Step<N> = ::Step<BinaryAgreement<N>>;
pub type Step<N, T> = ::Step<BinaryAgreement<N, T>>;
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub enum MessageContent {
@ -144,26 +154,3 @@ impl rand::Rand for MessageContent {
}
}
}
#[derive(Clone, Debug)]
struct Nonce(Vec<u8>);
impl Nonce {
pub fn new(
invocation_id: &[u8],
session_id: u64,
proposer_id: usize,
binary_agreement_epoch: u32,
) -> Self {
Nonce(Vec::from(format!(
"Nonce for Honey Badger {:?}@{}:{}:{}",
invocation_id, session_id, binary_agreement_epoch, proposer_id
)))
}
}
impl AsRef<[u8]> for Nonce {
fn as_ref(&self) -> &[u8] {
self.0.as_ref()
}
}

View File

@ -91,6 +91,7 @@ where
let max_future_epochs = *max_future_epochs;
let arc_netinfo = Arc::new(netinfo.clone());
let honey_badger = HoneyBadger::builder(arc_netinfo.clone())
.session_id(epoch)
.max_future_epochs(max_future_epochs)
.rng(rng.sub_rng())
.subset_handling_strategy(subset_handling_strategy.clone())

View File

@ -351,6 +351,7 @@ where
let netinfo = Arc::new(self.netinfo.clone());
self.vote_counter = VoteCounter::new(netinfo.clone(), epoch);
self.honey_badger = HoneyBadger::builder(netinfo)
.session_id(epoch)
.max_future_epochs(self.max_future_epochs)
.rng(self.rng.sub_rng())
.build();

View File

@ -14,6 +14,9 @@ use {Contribution, NetworkInfo, NodeIdT};
pub struct HoneyBadgerBuilder<C, N> {
/// Shared network data.
netinfo: Arc<NetworkInfo<N>>,
/// A session identifier. Different session IDs foil replay attacks in two instances with the
/// same epoch numbers and the same validators.
session_id: u64,
/// Start in this epoch.
epoch: u64,
/// The maximum number of future epochs for which we handle messages simultaneously.
@ -35,6 +38,7 @@ where
pub fn new(netinfo: Arc<NetworkInfo<N>>) -> Self {
HoneyBadgerBuilder {
netinfo,
session_id: 0,
epoch: 0,
max_future_epochs: 3,
rng: Box::new(rand::thread_rng()),
@ -49,6 +53,15 @@ where
self
}
/// Sets the session identifier.
///
/// Different session IDs foil replay attacks in two instances with the same epoch numbers and
/// the same validators.
pub fn session_id(&mut self, session_id: u64) -> &mut Self {
self.session_id = session_id;
self
}
/// Sets the starting epoch to the given value.
pub fn epoch(&mut self, epoch: u64) -> &mut Self {
self.epoch = epoch;
@ -74,6 +87,7 @@ where
pub fn build(&mut self) -> HoneyBadger<C, N> {
HoneyBadger {
netinfo: self.netinfo.clone(),
session_id: self.session_id,
epoch: self.epoch,
has_input: false,
epochs: BTreeMap::new(),

View File

@ -1,7 +1,9 @@
use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, BTreeSet};
use std::fmt::{self, Display};
use std::marker::PhantomData;
use std::mem::replace;
use std::result;
use std::sync::Arc;
use bincode;
@ -15,6 +17,8 @@ use subset::{self as cs, Subset, SubsetOutput};
use threshold_decryption::{self as td, ThresholdDecryption};
use {Contribution, DistAlgorithm, NetworkInfo, NodeIdT};
type CsStep<N> = cs::Step<N, EpochId>;
/// The status of an encrypted contribution.
#[derive(Debug)]
enum DecryptionState<N> {
@ -54,7 +58,7 @@ where
#[derive(Debug)]
enum SubsetState<N: Rand> {
/// The algorithm is ongoing: the set of accepted contributions is still undecided.
Ongoing(Subset<N>),
Ongoing(Subset<N, EpochId>),
/// The algorithm is complete. This contains the set of accepted proposers.
Complete(BTreeSet<N>),
}
@ -64,7 +68,7 @@ where
N: NodeIdT + Rand,
{
/// Provides input to the Subset instance, unless it has already completed.
fn handle_input(&mut self, proposal: Vec<u8>) -> Result<cs::Step<N>> {
fn handle_input(&mut self, proposal: Vec<u8>) -> Result<CsStep<N>> {
match self {
SubsetState::Ongoing(ref mut cs) => cs.handle_input(proposal),
SubsetState::Complete(_) => return Ok(cs::Step::default()),
@ -72,7 +76,7 @@ where
}
/// Handles a message in the Subset instance, unless it has already completed.
fn handle_message(&mut self, sender_id: &N, msg: cs::Message<N>) -> Result<cs::Step<N>> {
fn handle_message(&mut self, sender_id: &N, msg: cs::Message<N>) -> Result<CsStep<N>> {
match self {
SubsetState::Ongoing(ref mut cs) => cs.handle_message(sender_id, msg),
SubsetState::Complete(_) => return Ok(cs::Step::default()),
@ -194,10 +198,12 @@ where
/// Creates a new `Subset` instance.
pub fn new(
netinfo: Arc<NetworkInfo<N>>,
hb_id: u64,
epoch: u64,
subset_handling_strategy: SubsetHandlingStrategy,
) -> Result<Self> {
let cs = Subset::new(netinfo.clone(), epoch).map_err(ErrorKind::CreateSubset)?;
let epoch_id = EpochId { hb_id, epoch };
let cs = Subset::new(netinfo.clone(), &epoch_id).map_err(ErrorKind::CreateSubset)?;
Ok(EpochState {
epoch,
netinfo,
@ -290,7 +296,7 @@ where
}
/// Checks whether the subset has output, and if it does, sends out our decryption shares.
fn process_subset(&mut self, cs_step: cs::Step<N>) -> Result<Step<C, N>> {
fn process_subset(&mut self, cs_step: CsStep<N>) -> Result<Step<C, N>> {
let mut step = Step::default();
let cs_outputs = step.extend_with(cs_step, |cs_msg| {
MessageContent::Subset(cs_msg).with_epoch(self.epoch)
@ -376,3 +382,17 @@ where
}
}
}
/// A session identifier for a `Subset` sub-algorithm run within an epoch. It consists of the epoch
/// number, and an optional `HoneyBadger` session identifier.
#[derive(Clone, Debug, Serialize)]
struct EpochId {
hb_id: u64,
epoch: u64,
}
impl Display for EpochId {
fn fmt(&self, f: &mut fmt::Formatter) -> result::Result<(), fmt::Error> {
write!(f, "{}/{}", self.hb_id, self.epoch)
}
}

View File

@ -18,6 +18,9 @@ pub use super::epoch_state::SubsetHandlingStrategy;
pub struct HoneyBadger<C, N: Rand> {
/// Shared network data.
pub(super) netinfo: Arc<NetworkInfo<N>>,
/// A session identifier. Different session IDs foil replay attacks in two instances with the
/// same epoch numbers and the same validators.
pub(super) session_id: u64,
/// The earliest epoch from which we have not yet received output.
pub(super) epoch: u64,
/// Whether we have already submitted a proposal for the current epoch.
@ -176,6 +179,7 @@ where
Entry::Occupied(entry) => entry.into_mut(),
Entry::Vacant(entry) => entry.insert(EpochState::new(
self.netinfo.clone(),
self.session_id,
epoch,
self.subset_handling_strategy.clone(),
)?),

View File

@ -157,4 +157,4 @@ pub use crypto::pairing;
pub use fault_log::{Fault, FaultKind, FaultLog};
pub use messaging::{SourcedMessage, Target, TargetedMessage};
pub use network_info::NetworkInfo;
pub use traits::{Contribution, DistAlgorithm, Message, NodeIdT, Step};
pub use traits::{Contribution, DistAlgorithm, Message, NodeIdT, SessionIdT, Step};

View File

@ -128,23 +128,13 @@ impl<N: NodeIdT> NetworkInfo<N> {
&self.public_keys
}
/// The index of a node in a canonical numbering of all nodes.
/// The index of a node in a canonical numbering of all nodes. This is the index where the
/// node appears in `all_ids`.
#[inline]
pub fn node_index(&self, id: &N) -> Option<usize> {
self.node_indices.get(id).cloned()
}
/// Returns the unique ID of the Honey Badger invocation.
///
/// FIXME: Using the public key as the invocation ID either requires agreeing on the keys on
/// each invocation, or makes it unsafe to reuse keys for different invocations. A better
/// invocation ID would be one that is distributed to all nodes on each invocation and would be
/// independent from the public key, so that reusing keys would be safer.
#[inline]
pub fn invocation_id(&self) -> Vec<u8> {
self.public_key_set.public_key().to_bytes()
}
/// Returns `true` if this node takes part in the consensus itself. If not, it is only an
/// observer.
#[inline]

View File

@ -23,16 +23,21 @@
//! * Once all `BinaryAgreement` instances have decided, `Subset` returns the set of all proposed
//! values for which the decision was "yes".
use std::borrow::Borrow;
use std::collections::{BTreeMap, BTreeSet};
use std::fmt::{self, Display};
use std::result;
use std::sync::Arc;
use hex_fmt::HexFmt;
use binary_agreement::{self, BinaryAgreement};
use binary_agreement;
use broadcast::{self, Broadcast};
use rand::Rand;
use {DistAlgorithm, NetworkInfo, NodeIdT};
use {DistAlgorithm, NetworkInfo, NodeIdT, SessionIdT};
type BaInstance<N, S> = binary_agreement::BinaryAgreement<N, BaSessionId<S>>;
type BaStep<N, S> = binary_agreement::Step<N, BaSessionId<S>>;
/// A subset error.
#[derive(Clone, PartialEq, Debug, Fail)]
@ -56,7 +61,7 @@ pub enum Error {
}
/// A subset result.
pub type Result<T> = ::std::result::Result<T, Error>;
pub type Result<T> = result::Result<T, Error>;
// TODO: Make this a generic argument of `Subset`.
type ProposedValue = Vec<u8>;
@ -66,18 +71,18 @@ type ProposedValue = Vec<u8>;
pub enum Message<N: Rand> {
/// A message for the broadcast algorithm concerning the set element proposed by the given node.
Broadcast(N, broadcast::Message),
/// A message for the Binary Agreement algorithm concerning the set element proposed by the given
/// node.
/// A message for the Binary Agreement algorithm concerning the set element proposed by the
/// given node.
BinaryAgreement(N, binary_agreement::Message),
}
/// Subset algorithm instance
#[derive(Debug)]
pub struct Subset<N: Rand> {
pub struct Subset<N: Rand, S> {
/// Shared network information.
netinfo: Arc<NetworkInfo<N>>,
broadcast_instances: BTreeMap<N, Broadcast<N>>,
ba_instances: BTreeMap<N, BinaryAgreement<N>>,
ba_instances: BTreeMap<N, BaInstance<N, S>>,
/// `None` means that that item has already been output.
broadcast_results: BTreeMap<N, Option<ProposedValue>>,
ba_results: BTreeMap<N, bool>,
@ -85,25 +90,25 @@ pub struct Subset<N: Rand> {
decided: bool,
}
pub type Step<N> = ::Step<Subset<N>>;
pub type Step<N, S> = ::Step<Subset<N, S>>;
impl<N: NodeIdT + Rand> DistAlgorithm for Subset<N> {
impl<N: NodeIdT + Rand, S: SessionIdT> DistAlgorithm for Subset<N, S> {
type NodeId = N;
type Input = ProposedValue;
type Output = SubsetOutput<N>;
type Message = Message<N>;
type Error = Error;
fn handle_input(&mut self, input: Self::Input) -> Result<Step<N>> {
fn handle_input(&mut self, input: Self::Input) -> Result<Step<N, S>> {
self.propose(input)
}
fn handle_message(&mut self, sender_id: &N, message: Message<N>) -> Result<Step<N>> {
fn handle_message(&mut self, sender_id: &N, message: Message<N>) -> Result<Step<N, S>> {
self.handle_message(sender_id, message)
}
fn terminated(&self) -> bool {
self.ba_instances.values().all(BinaryAgreement::terminated)
self.ba_instances.values().all(BaInstance::terminated)
}
fn our_id(&self) -> &Self::NodeId {
@ -117,12 +122,12 @@ pub enum SubsetOutput<N> {
Done,
}
impl<N: NodeIdT + Rand> Subset<N> {
impl<N: NodeIdT + Rand, S: SessionIdT> Subset<N, S> {
/// Creates a new `Subset` instance with the given session identifier.
///
/// If multiple `Subset`s are instantiated within a single network, they must use different
/// session identifiers to foil replay attacks.
pub fn new(netinfo: Arc<NetworkInfo<N>>, session_id: u64) -> Result<Self> {
pub fn new<T: Borrow<S>>(netinfo: Arc<NetworkInfo<N>>, session_id: T) -> Result<Self> {
// Create all broadcast instances.
let mut broadcast_instances: BTreeMap<N, Broadcast<N>> = BTreeMap::new();
for proposer_id in netinfo.all_ids() {
@ -134,12 +139,15 @@ impl<N: NodeIdT + Rand> Subset<N> {
}
// Create all Binary Agreement instances.
let mut ba_instances: BTreeMap<N, BinaryAgreement<N>> = BTreeMap::new();
for proposer_id in netinfo.all_ids() {
let mut ba_instances: BTreeMap<N, BaInstance<N, S>> = BTreeMap::new();
for (proposer_idx, proposer_id) in netinfo.all_ids().enumerate() {
let s_id = BaSessionId {
subset_id: session_id.borrow().clone(),
proposer_idx: proposer_idx as u32,
};
ba_instances.insert(
proposer_id.clone(),
BinaryAgreement::new(netinfo.clone(), session_id, proposer_id.clone())
.map_err(Error::NewBinaryAgreement)?,
BaInstance::new(netinfo.clone(), s_id).map_err(Error::NewBinaryAgreement)?,
);
}
@ -156,7 +164,7 @@ impl<N: NodeIdT + Rand> Subset<N> {
/// Proposes a value for the subset.
///
/// Returns an error if we already made a proposal.
pub fn propose(&mut self, value: ProposedValue) -> Result<Step<N>> {
pub fn propose(&mut self, value: ProposedValue) -> Result<Step<N, S>> {
if !self.netinfo.is_validator() {
return Ok(Step::default());
}
@ -168,7 +176,7 @@ impl<N: NodeIdT + Rand> Subset<N> {
/// Handles a message received from `sender_id`.
///
/// This must be called with every message we receive from another node.
pub fn handle_message(&mut self, sender_id: &N, message: Message<N>) -> Result<Step<N>> {
pub fn handle_message(&mut self, sender_id: &N, message: Message<N>) -> Result<Step<N, S>> {
match message {
Message::Broadcast(p_id, b_msg) => self.handle_broadcast(sender_id, &p_id, b_msg),
Message::BinaryAgreement(p_id, a_msg) => {
@ -189,7 +197,7 @@ impl<N: NodeIdT + Rand> Subset<N> {
sender_id: &N,
proposer_id: &N,
bmessage: broadcast::Message,
) -> Result<Step<N>> {
) -> Result<Step<N, S>> {
self.process_broadcast(proposer_id, |bc| bc.handle_message(sender_id, bmessage))
}
@ -200,7 +208,7 @@ impl<N: NodeIdT + Rand> Subset<N> {
sender_id: &N,
proposer_id: &N,
amessage: binary_agreement::Message,
) -> Result<Step<N>> {
) -> Result<Step<N, S>> {
// Send the message to the local instance of Binary Agreement.
self.process_binary_agreement(proposer_id, |binary_agreement| {
binary_agreement.handle_message(sender_id, amessage)
@ -209,7 +217,7 @@ impl<N: NodeIdT + Rand> Subset<N> {
/// Upon delivery of v_j from RBC_j, if input has not yet been provided to
/// BA_j, then provide input 1 to BA_j. See Figure 11.
fn process_broadcast<F>(&mut self, proposer_id: &N, f: F) -> Result<Step<N>>
fn process_broadcast<F>(&mut self, proposer_id: &N, f: F) -> Result<Step<N, S>>
where
F: FnOnce(&mut Broadcast<N>) -> result::Result<broadcast::Step<N>, broadcast::Error>,
{
@ -246,17 +254,16 @@ impl<N: NodeIdT + Rand> Subset<N> {
{
error!("Duplicate insert in broadcast_results: {:?}", inval)
}
let set_binary_agreement_input = |ba: &mut BinaryAgreement<N>| ba.handle_input(true);
Ok(step
.join(self.process_binary_agreement(proposer_id, set_binary_agreement_input)?)
.with_output(self.try_binary_agreement_completion()))
let set_binary_agreement_input = |ba: &mut BaInstance<N, S>| ba.handle_input(true);
step.extend(self.process_binary_agreement(proposer_id, set_binary_agreement_input)?);
Ok(step.with_output(self.try_binary_agreement_completion()))
}
/// Callback to be invoked on receipt of the decision value of the Binary Agreement
/// instance `id`.
fn process_binary_agreement<F>(&mut self, proposer_id: &N, f: F) -> Result<Step<N>>
fn process_binary_agreement<F>(&mut self, proposer_id: &N, f: F) -> Result<Step<N, S>>
where
F: FnOnce(&mut BinaryAgreement<N>) -> binary_agreement::Result<binary_agreement::Step<N>>,
F: FnOnce(&mut BaInstance<N, S>) -> binary_agreement::Result<BaStep<N, S>>,
{
let mut step = Step::default();
let accepted = {
@ -375,3 +382,22 @@ impl<N: NodeIdT + Rand> Subset<N> {
}
}
}
/// A session identifier for a `BinaryAgreement` instance run as a `Subset` sub-algorithm. It
/// consists of the `Subset` instance's own session ID, and the index of the proposer whose
/// contribution this `BinaryAgreement` is about.
#[derive(Clone, Debug, Serialize)]
struct BaSessionId<S> {
subset_id: S,
proposer_idx: u32,
}
impl<S: Display> Display for BaSessionId<S> {
fn fmt(&self, f: &mut fmt::Formatter) -> result::Result<(), fmt::Error> {
write!(
f,
"subset {}, proposer #{}",
self.subset_id, self.proposer_idx
)
}
}

View File

@ -1,10 +1,11 @@
//! Common supertraits for distributed algorithms.
use std::fmt::Debug;
use std::fmt::{Debug, Display};
use std::hash::Hash;
use std::iter::once;
use failure::Fail;
use serde::Serialize;
use fault_log::{Fault, FaultLog};
use TargetedMessage;
@ -21,6 +22,10 @@ impl<N> NodeIdT for N where N: Eq + Ord + Clone + Debug + Hash + Send + Sync {}
pub trait Message: Debug + Send + Sync {}
impl<M> Message for M where M: Debug + Send + Sync {}
/// Session identifiers.
pub trait SessionIdT: Display + Serialize + Send + Sync + Clone {}
impl<S> SessionIdT for S where S: Display + Serialize + Send + Sync + Clone {}
/// Result of one step of the local state machine of a distributed algorithm. Such a result should
/// be used and never discarded by the client of the algorithm.
#[must_use = "The algorithm step result must be used."]

View File

@ -37,8 +37,8 @@ use hbbft::NetworkInfo;
use network::{Adversary, MessageScheduler, NodeId, SilentAdversary, TestNetwork, TestNode};
fn test_binary_agreement<A: Adversary<BinaryAgreement<NodeId>>>(
mut network: TestNetwork<A, BinaryAgreement<NodeId>>,
fn test_binary_agreement<A: Adversary<BinaryAgreement<NodeId, u8>>>(
mut network: TestNetwork<A, BinaryAgreement<NodeId, u8>>,
input: Option<bool>,
) {
let ids: Vec<NodeId> = network.nodes.keys().cloned().collect();
@ -65,7 +65,7 @@ fn test_binary_agreement<A: Adversary<BinaryAgreement<NodeId>>>(
fn test_binary_agreement_different_sizes<A, F>(new_adversary: F)
where
A: Adversary<BinaryAgreement<NodeId>>,
A: Adversary<BinaryAgreement<NodeId, u8>>,
F: Fn(usize, usize) -> A,
{
// This returns an error in all but the first test.
@ -85,7 +85,7 @@ where
);
let adversary = |_| new_adversary(num_good_nodes, num_faulty_nodes);
let new_ba = |netinfo: Arc<NetworkInfo<NodeId>>| {
BinaryAgreement::new(netinfo, 0, NodeId(0)).expect("Binary Agreement instance")
BinaryAgreement::new(netinfo, 0).expect("Binary Agreement instance")
};
let network = TestNetwork::new(num_good_nodes, num_faulty_nodes, adversary, new_ba);
test_binary_agreement(network, input);

View File

@ -25,8 +25,8 @@ use network::{Adversary, MessageScheduler, NodeId, SilentAdversary, TestNetwork,
type ProposedValue = Vec<u8>;
fn test_subset<A: Adversary<Subset<NodeId>>>(
mut network: TestNetwork<A, Subset<NodeId>>,
fn test_subset<A: Adversary<Subset<NodeId, u8>>>(
mut network: TestNetwork<A, Subset<NodeId, u8>>,
inputs: &BTreeMap<NodeId, ProposedValue>,
) {
let ids: Vec<NodeId> = network.nodes.keys().cloned().collect();
@ -75,9 +75,9 @@ fn new_network<A, F>(
good_num: usize,
bad_num: usize,
adversary: F,
) -> TestNetwork<A, Subset<NodeId>>
) -> TestNetwork<A, Subset<NodeId, u8>>
where
A: Adversary<Subset<NodeId>>,
A: Adversary<Subset<NodeId, u8>>,
F: Fn(BTreeMap<NodeId, Arc<NetworkInfo<NodeId>>>) -> A,
{
// This returns an error in all but the first test.