Rewrite Subset.

This merges the four maps into a single map, each entry of which tracks
the progress of both the `Broadcast` and the `BinaryAgreement`
subalgorithm for an individual proposer. Two advantages:
* A slight optimization, because the `Broadcast` instance is dropped as
soon as it is not needed anymore.
* The static guarantee that certain impossible situations (inserting a
broadcast value twice) cannot happen.

The module is also split up into smaller files.
This commit is contained in:
Andreas Fackler 2018-11-03 12:31:36 +01:00 committed by Andreas Fackler
parent 62cd29e4ca
commit 2456db2d9e
9 changed files with 479 additions and 413 deletions

View File

@ -115,6 +115,11 @@ impl<N: NodeIdT> Broadcast<N> {
}
}
/// Returns the proposer's node ID.
pub fn proposer_id(&self) -> &N {
&self.proposer_id
}
/// Breaks the input value into shards of equal length and encodes them --
/// and some extra parity shards -- with a Reed-Solomon erasure coding
/// scheme. The returned value contains the shard assigned to this

View File

@ -1,411 +0,0 @@
//! # Subset algorithm.
//!
//! The Subset protocol assumes a network of _N_ nodes that send signed
//! messages to each other, with at most _f_ of them malicious, where _3 f < N_. Handling the
//! networking and signing is the responsibility of the user: only when a message has been
//! verified to be "from node i" (e.g. using cryptographic signatures), it can be handed to the
//! `Subset` instance.
//!
//! Each node proposes an element for inclusion. Under the above conditions, the protocol
//! guarantees that all correct nodes output the same set, consisting of at least _N - f_ of the
//! proposed elements.
//!
//! ## How it works
//!
//! * `Subset` instantiates one `Broadcast` algorithm for each of the participating nodes.
//! At least _N - f_ of these - the ones whose proposer is not faulty - will eventually output
//! the element proposed by that node.
//! * It also instantiates Binary Agreement for each participating node, to decide whether
//! that node's proposed element should be included in the set. Whenever an element is
//! received via broadcast, we input "yes" (`true`) into the corresponding `BinaryAgreement` instance.
//! * When _N - f_ `BinaryAgreement` instances have decided "yes", we input "no" (`false`) into the
//! remaining ones, where we haven't provided input yet.
//! * Once all `BinaryAgreement` instances have decided, `Subset` returns the set of all proposed
//! values for which the decision was "yes".
use std::collections::{BTreeMap, BTreeSet};
use std::sync::Arc;
use std::{fmt, result};
use failure::Fail;
use hex_fmt::HexFmt;
use log::{debug, error};
use rand_derive::Rand;
use serde_derive::{Deserialize, Serialize};
use binary_agreement;
use broadcast::{self, Broadcast};
use rand::Rand;
use {DistAlgorithm, NetworkInfo, NodeIdT, SessionIdT};
type BaInstance<N, S> = binary_agreement::BinaryAgreement<N, BaSessionId<S>>;
type BaStep<N, S> = binary_agreement::Step<N, BaSessionId<S>>;
/// A subset error.
#[derive(Clone, PartialEq, Debug, Fail)]
pub enum Error {
#[fail(display = "NewBinaryAgreement error: {}", _0)]
NewBinaryAgreement(binary_agreement::Error),
#[fail(display = "ProcessBinaryAgreement0 error: {}", _0)]
ProcessBinaryAgreement0(binary_agreement::Error),
#[fail(display = "ProcessBinaryAgreement1 error: {}", _0)]
ProcessBinaryAgreement1(binary_agreement::Error),
#[fail(display = "NewBroadcast error: {}", _0)]
NewBroadcast(broadcast::Error),
#[fail(display = "ProcessBroadcastBroadcast error: {}", _0)]
ProcessBroadcastBroadcast(broadcast::Error),
#[fail(display = "Multiple Binary Agreement results")]
MultipleBinaryAgreementResults,
#[fail(display = "No such Binary Agreement instance")]
NoSuchBinaryAgreementInstance,
#[fail(display = "No such broadcast instance")]
NoSuchBroadcastInstance,
}
/// A subset result.
pub type Result<T> = result::Result<T, Error>;
/// Message from Subset to remote nodes.
#[derive(Serialize, Deserialize, Clone, Debug, Rand)]
pub enum Message<N: Rand> {
/// A message for the broadcast algorithm concerning the set element proposed by the given node.
Broadcast(N, broadcast::Message),
/// A message for the Binary Agreement algorithm concerning the set element proposed by the
/// given node.
BinaryAgreement(N, binary_agreement::Message),
}
/// Subset algorithm instance
#[derive(Debug)]
pub struct Subset<N: Rand, S> {
/// Shared network information.
netinfo: Arc<NetworkInfo<N>>,
session_id: S,
broadcast_instances: BTreeMap<N, Broadcast<N>>,
ba_instances: BTreeMap<N, BaInstance<N, S>>,
/// `None` means that that item has already been output.
broadcast_results: BTreeMap<N, Option<Vec<u8>>>,
ba_results: BTreeMap<N, bool>,
/// Whether the instance has decided on a value.
decided: bool,
}
pub type Step<N, S> = ::Step<Subset<N, S>>;
impl<N: NodeIdT + Rand, S: SessionIdT> DistAlgorithm for Subset<N, S> {
type NodeId = N;
type Input = Vec<u8>;
type Output = SubsetOutput<N>;
type Message = Message<N>;
type Error = Error;
fn handle_input(&mut self, input: Self::Input) -> Result<Step<N, S>> {
self.propose(input)
}
fn handle_message(&mut self, sender_id: &N, message: Message<N>) -> Result<Step<N, S>> {
self.handle_message(sender_id, message)
}
fn terminated(&self) -> bool {
self.ba_instances.values().all(BaInstance::terminated)
}
fn our_id(&self) -> &Self::NodeId {
self.netinfo.our_id()
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum SubsetOutput<N> {
Contribution(N, Vec<u8>),
Done,
}
impl<N: NodeIdT + Rand, S: SessionIdT> Subset<N, S> {
/// Creates a new `Subset` instance with the given session identifier.
///
/// If multiple `Subset`s are instantiated within a single network, they must use different
/// session identifiers to foil replay attacks.
pub fn new(netinfo: Arc<NetworkInfo<N>>, session_id: S) -> Result<Self> {
// Create all broadcast instances.
let mut broadcast_instances: BTreeMap<N, Broadcast<N>> = BTreeMap::new();
for proposer_id in netinfo.all_ids() {
broadcast_instances.insert(
proposer_id.clone(),
Broadcast::new(netinfo.clone(), proposer_id.clone())
.map_err(Error::NewBroadcast)?,
);
}
// Create all Binary Agreement instances.
let mut ba_instances: BTreeMap<N, BaInstance<N, S>> = BTreeMap::new();
for (proposer_idx, proposer_id) in netinfo.all_ids().enumerate() {
let s_id = BaSessionId {
subset_id: session_id.clone(),
proposer_idx: proposer_idx as u32,
};
ba_instances.insert(
proposer_id.clone(),
BaInstance::new(netinfo.clone(), s_id).map_err(Error::NewBinaryAgreement)?,
);
}
Ok(Subset {
netinfo,
session_id,
broadcast_instances,
ba_instances,
broadcast_results: BTreeMap::new(),
ba_results: BTreeMap::new(),
decided: false,
})
}
/// Proposes a value for the subset.
///
/// Returns an error if we already made a proposal.
pub fn propose(&mut self, value: Vec<u8>) -> Result<Step<N, S>> {
if !self.netinfo.is_validator() {
return Ok(Step::default());
}
debug!("{} proposing {:0.10}", self, HexFmt(&value));
let id = self.our_id().clone();
self.process_broadcast(&id, |bc| bc.handle_input(value))
}
/// Handles a message received from `sender_id`.
///
/// This must be called with every message we receive from another node.
pub fn handle_message(&mut self, sender_id: &N, message: Message<N>) -> Result<Step<N, S>> {
match message {
Message::Broadcast(p_id, b_msg) => self.handle_broadcast(sender_id, &p_id, b_msg),
Message::BinaryAgreement(p_id, a_msg) => {
self.handle_binary_agreement(sender_id, &p_id, a_msg)
}
}
}
/// Returns the number of validators from which we have already received a proposal.
pub(crate) fn received_proposals(&self) -> usize {
self.broadcast_results.len()
}
/// Receives a broadcast message from a remote node `sender_id` concerning a
/// value proposed by the node `proposer_id`.
fn handle_broadcast(
&mut self,
sender_id: &N,
proposer_id: &N,
bmessage: broadcast::Message,
) -> Result<Step<N, S>> {
self.process_broadcast(proposer_id, |bc| bc.handle_message(sender_id, bmessage))
}
/// Receives a Binary Agreement message from a remote node `sender_id` concerning
/// a value proposed by the node `proposer_id`.
fn handle_binary_agreement(
&mut self,
sender_id: &N,
proposer_id: &N,
amessage: binary_agreement::Message,
) -> Result<Step<N, S>> {
// Send the message to the local instance of Binary Agreement.
self.process_binary_agreement(proposer_id, |binary_agreement| {
binary_agreement.handle_message(sender_id, amessage)
})
}
/// Upon delivery of v_j from RBC_j, if input has not yet been provided to
/// BA_j, then provide input 1 to BA_j. See Figure 11.
fn process_broadcast<F>(&mut self, proposer_id: &N, f: F) -> Result<Step<N, S>>
where
F: FnOnce(&mut Broadcast<N>) -> result::Result<broadcast::Step<N>, broadcast::Error>,
{
let mut step = Step::default();
let value = {
let broadcast = self
.broadcast_instances
.get_mut(proposer_id)
.ok_or(Error::NoSuchBroadcastInstance)?;
let to_msg = |b_msg| Message::Broadcast(proposer_id.clone(), b_msg);
let output = step.extend_with(
f(broadcast).map_err(Error::ProcessBroadcastBroadcast)?,
to_msg,
);
if let Some(output) = output.into_iter().next() {
output
} else {
return Ok(step);
}
};
let val_to_insert = if let Some(true) = self.ba_results.get(proposer_id) {
debug!("{} {:?} → {:0.10}", self, proposer_id, HexFmt(&value));
step.output
.push(SubsetOutput::Contribution(proposer_id.clone(), value));
None
} else {
Some(value)
};
if let Some(inval) = self
.broadcast_results
.insert(proposer_id.clone(), val_to_insert)
{
// TODO: Merge `broadcast_instances` and `broadcast_results` into one map. The value
// type should be an enum: either an instance, or a result. Then this would be
// statically impossible.
error!(
"Duplicate insert in broadcast_results: {:?}",
inval.map(HexFmt)
)
}
let set_binary_agreement_input = |ba: &mut BaInstance<N, S>| ba.handle_input(true);
step.extend(self.process_binary_agreement(proposer_id, set_binary_agreement_input)?);
Ok(step.with_output(self.try_binary_agreement_completion()))
}
/// Callback to be invoked on receipt of the decision value of the Binary Agreement
/// instance `id`.
fn process_binary_agreement<F>(&mut self, proposer_id: &N, f: F) -> Result<Step<N, S>>
where
F: FnOnce(&mut BaInstance<N, S>) -> binary_agreement::Result<BaStep<N, S>>,
{
let mut step = Step::default();
let accepted = {
let binary_agreement = self
.ba_instances
.get_mut(proposer_id)
.ok_or(Error::NoSuchBinaryAgreementInstance)?;
if binary_agreement.terminated() {
return Ok(step);
}
let to_msg = |a_msg| Message::BinaryAgreement(proposer_id.clone(), a_msg);
let output = step.extend_with(
f(binary_agreement).map_err(Error::ProcessBinaryAgreement0)?,
to_msg,
);
if let Some(accepted) = output.into_iter().next() {
accepted
} else {
return Ok(step);
}
};
// Binary agreement result accepted.
if self
.ba_results
.insert(proposer_id.clone(), accepted)
.is_some()
{
return Err(Error::MultipleBinaryAgreementResults);
}
debug!(
"{} updated Binary Agreement results: {:?}",
self, self.ba_results
);
if accepted {
if self.count_true() == self.netinfo.num_correct() {
// Upon delivery of value 1 from at least N f instances of BA, provide
// input 0 to each instance of BA that has not yet been provided input.
for (id, binary_agreement) in &mut self.ba_instances {
let to_msg = |a_msg| Message::BinaryAgreement(id.clone(), a_msg);
for output in step.extend_with(
binary_agreement
.handle_input(false)
.map_err(Error::ProcessBinaryAgreement1)?,
to_msg,
) {
if self.ba_results.insert(id.clone(), output).is_some() {
return Err(Error::MultipleBinaryAgreementResults);
}
}
}
}
if let Some(value) = self
.broadcast_results
.get_mut(proposer_id)
.and_then(Option::take)
{
debug!("{} {:?} → {:0.10}", self, proposer_id, HexFmt(&value));
step.output
.push(SubsetOutput::Contribution(proposer_id.clone(), value));
}
}
Ok(step.with_output(self.try_binary_agreement_completion()))
}
/// Returns the number of Binary Agreement instances that have decided "yes".
fn count_true(&self) -> usize {
self.ba_results.values().filter(|v| **v).count()
}
fn try_binary_agreement_completion(&mut self) -> Option<SubsetOutput<N>> {
if self.decided || self.count_true() < self.netinfo.num_correct() {
return None;
}
// Once all instances of BA have completed, let C ⊂ [1..N] be
// the indexes of each BA that delivered 1. Wait for the output
// v_j for each RBC_j such that j∈C. Finally output j∈C v_j.
if self.ba_results.len() < self.netinfo.num_nodes() {
return None;
}
debug!("{}: All Binary Agreement instances have terminated.", self);
// All instances of BinaryAgreement that delivered `true` (or "1" in the paper).
let delivered_1: BTreeSet<&N> = self
.ba_results
.iter()
.filter(|(_, v)| **v)
.map(|(k, _)| k)
.collect();
debug!(
"{}: Binary Agreement instances that delivered `true`: {:?}",
self, delivered_1
);
// Results of Broadcast instances in `delivered_1`
let broadcast_results: BTreeSet<&N> = self
.broadcast_results
.iter()
.filter(|(k, _)| delivered_1.contains(k))
.map(|(k, _)| k)
.collect();
if delivered_1.len() == broadcast_results.len() {
debug!("{}: All Binary Agreement instances completed.", self);
self.decided = true;
Some(SubsetOutput::Done)
} else {
None
}
}
}
impl<N: NodeIdT + Rand, S: SessionIdT> fmt::Display for Subset<N, S> {
fn fmt(&self, f: &mut fmt::Formatter) -> result::Result<(), fmt::Error> {
write!(f, "{:?} Subset({})", self.our_id(), self.session_id)
}
}
/// A session identifier for a `BinaryAgreement` instance run as a `Subset` sub-algorithm. It
/// consists of the `Subset` instance's own session ID, and the index of the proposer whose
/// contribution this `BinaryAgreement` is about.
#[derive(Clone, Debug, Serialize)]
struct BaSessionId<S> {
subset_id: S,
proposer_idx: u32,
}
impl<S: fmt::Display> fmt::Display for BaSessionId<S> {
fn fmt(&self, f: &mut fmt::Formatter) -> result::Result<(), fmt::Error> {
write!(
f,
"subset {}, proposer #{}",
self.subset_id, self.proposer_idx
)
}
}

27
src/subset/error.rs Normal file
View File

@ -0,0 +1,27 @@
use failure::Fail;
use std::result;
use binary_agreement;
use broadcast;
/// A subset error.
#[derive(Clone, PartialEq, Debug, Fail)]
pub enum Error {
#[fail(display = "Error creating BinaryAgreement: {}", _0)]
NewAgreement(binary_agreement::Error),
#[fail(display = "Error creating Broadcast: {}", _0)]
NewBroadcast(broadcast::Error),
#[fail(display = "Error handling Broadcast input/message: {}", _0)]
HandleBroadcast(broadcast::Error),
#[fail(
display = "Error handling BinaryAgreement input/message: {}",
_0
)]
HandleAgreement(binary_agreement::Error),
#[fail(display = "Unknown proposer ID")]
UnknownProposer,
}
/// A subset result.
pub type Result<T> = result::Result<T, Error>;

33
src/subset/message.rs Normal file
View File

@ -0,0 +1,33 @@
use rand::Rand;
use rand_derive::Rand;
use serde_derive::{Deserialize, Serialize};
use binary_agreement;
use broadcast;
/// Message from Subset to remote nodes.
#[derive(Serialize, Deserialize, Clone, Debug, Rand)]
pub struct Message<N: Rand> {
/// The proposer whose contribution this message is about.
pub proposer_id: N,
/// The wrapped broadcast or agreement message.
pub content: MessageContent,
}
/// A message about a particular proposer's contribution.
#[derive(Serialize, Deserialize, Clone, Debug, Rand)]
pub enum MessageContent {
/// A wrapped message for the broadcast instance, to deliver the proposed value.
Broadcast(broadcast::Message),
/// A wrapped message for the agreement instance, to decide on whether to accept the value.
Agreement(binary_agreement::Message),
}
impl MessageContent {
pub fn with<N: Rand>(self, proposer_id: N) -> Message<N> {
Message {
proposer_id,
content: self,
}
}
}

33
src/subset/mod.rs Normal file
View File

@ -0,0 +1,33 @@
//! # Subset algorithm.
//!
//! The Subset protocol assumes a network of _N_ nodes that send signed
//! messages to each other, with at most _f_ of them malicious, where _3 f < N_. Handling the
//! networking and signing is the responsibility of the user: only when a message has been
//! verified to be "from node i" (e.g. using cryptographic signatures), it can be handed to the
//! `Subset` instance.
//!
//! Each node proposes an element for inclusion. Under the above conditions, the protocol
//! guarantees that all correct nodes output the same set, consisting of at least _N - f_ of the
//! proposed elements.
//!
//! ## How it works
//!
//! * `Subset` instantiates one `Broadcast` algorithm for each of the participating nodes.
//! At least _N - f_ of these - the ones whose proposer is not faulty - will eventually output
//! the element proposed by that node.
//! * It also instantiates Binary Agreement for each participating node, to decide whether
//! that node's proposed element should be included in the set. Whenever an element is
//! received via broadcast, we input "yes" (`true`) into the corresponding `BinaryAgreement` instance.
//! * When _N - f_ `BinaryAgreement` instances have decided "yes", we input "no" (`false`) into the
//! remaining ones, where we haven't provided input yet.
//! * Once all `BinaryAgreement` instances have decided, `Subset` returns the set of all proposed
//! values for which the decision was "yes".
mod error;
mod message;
mod proposal_state;
mod subset;
pub use self::error::{Error, Result};
pub use self::message::{Message, MessageContent};
pub use self::subset::{Step, Subset, SubsetOutput};

View File

@ -0,0 +1,188 @@
use std::mem;
use std::sync::Arc;
use super::subset::BaSessionId;
use super::{Error, MessageContent, Result};
use binary_agreement;
use broadcast::{self, Broadcast};
use rand::Rand;
use {DistAlgorithm, NetworkInfo, NodeIdT, SessionIdT};
type BaInstance<N, S> = binary_agreement::BinaryAgreement<N, BaSessionId<S>>;
type ValueAndStep<N, S> = (Option<Vec<u8>>, Step<N, S>);
type BaResult<N, S> = binary_agreement::Result<binary_agreement::Step<N, BaSessionId<S>>>;
pub type Step<N, S> = ::Step<ProposalState<N, S>>;
/// The state of a proposal's broadcast and agreement process.
#[derive(Debug)]
pub enum ProposalState<N: Rand, S> {
/// We are still awaiting the value from the `Broadcast` protocol and the decision from
/// `BinaryAgreement`.
Ongoing(Broadcast<N>, BaInstance<N, S>),
/// We received the value but are still waiting for `BinaryAgreement`, whether to output.
HasValue(Vec<u8>, BaInstance<N, S>),
/// The values has been accepted, but we haven't received it yet.
Accepted(Broadcast<N>),
/// We are done: either we output (`true`) or we dropped the value (`false`).
Complete(bool),
}
impl<N: NodeIdT + Rand, S: SessionIdT> DistAlgorithm for ProposalState<N, S> {
type NodeId = N;
type Input = Vec<u8>;
type Output = Vec<u8>;
type Message = MessageContent;
type Error = Error;
fn handle_input(&mut self, input: Self::Input) -> Result<Step<N, S>> {
self.propose(input)
}
fn handle_message(&mut self, sender_id: &N, message: MessageContent) -> Result<Step<N, S>> {
self.handle_message(sender_id, message)
}
fn terminated(&self) -> bool {
self.complete()
}
fn our_id(&self) -> &Self::NodeId {
unreachable!() // We don't actually need `DistAlgorithm`, just `Step`.
}
}
impl<N: NodeIdT + Rand, S: SessionIdT> ProposalState<N, S> {
/// Creates a new `ProposalState::Ongoing`, with a fresh broadcast and agreement instance.
pub fn new(netinfo: Arc<NetworkInfo<N>>, ba_id: BaSessionId<S>, prop_id: N) -> Result<Self> {
let agreement = BaInstance::new(netinfo.clone(), ba_id).map_err(Error::NewAgreement)?;
let broadcast = Broadcast::new(netinfo, prop_id).map_err(Error::NewBroadcast)?;
Ok(ProposalState::Ongoing(broadcast, agreement))
}
/// Returns `true` if we already received the `Broadcast` result.
pub fn received(&self) -> bool {
match self {
ProposalState::Ongoing(_, _) | ProposalState::Accepted(_) => false,
ProposalState::HasValue(_, _) => true,
ProposalState::Complete(accepted) => *accepted,
}
}
/// Returns `true` if this proposal has been accepted, even if we don't have the value yet.
pub fn accepted(&self) -> bool {
match self {
ProposalState::Ongoing(_, _) | ProposalState::HasValue(_, _) => false,
ProposalState::Accepted(_) => true,
ProposalState::Complete(accepted) => *accepted,
}
}
/// Returns `true` if this proposal has been rejected, or accepted and output.
pub fn complete(&self) -> bool {
match self {
ProposalState::Ongoing(_, _)
| ProposalState::HasValue(_, _)
| ProposalState::Accepted(_) => false,
ProposalState::Complete(_) => true,
}
}
/// Makes a proposal by broadcasting a value.
pub fn propose(&mut self, value: Vec<u8>) -> Result<Step<N, S>> {
self.transition(|state| state.handle_broadcast(|bc| bc.broadcast(value)))
}
/// Handles a message received from `sender_id`.
pub fn handle_message(&mut self, sender_id: &N, msg: MessageContent) -> Result<Step<N, S>> {
self.transition(|state| match msg {
MessageContent::Agreement(ba_msg) => {
state.handle_agreement(|ba| ba.handle_message(sender_id, ba_msg))
}
MessageContent::Broadcast(bc_msg) => {
state.handle_broadcast(|bc| bc.handle_message(sender_id, bc_msg))
}
})
}
/// Votes for rejecting the proposal, if still possible.
pub fn vote_false(&mut self) -> Result<Step<N, S>> {
self.transition(|state| state.handle_agreement(|ba| ba.propose(false)))
}
/// Applies `f` to the `Broadcast` instance, and updates the state according to the outcome.
fn handle_broadcast<F>(self, f: F) -> (Self, Result<Step<N, S>>)
where
F: FnOnce(&mut Broadcast<N>) -> broadcast::Result<broadcast::Step<N>>,
{
use self::ProposalState::*;
match self {
Ongoing(mut bc, ba) => match Self::convert_bc(f(&mut bc)) {
Err(err) => (Ongoing(bc, ba), Err(err)),
Ok((None, step)) => (Ongoing(bc, ba), Ok(step)),
Ok((Some(value), step)) => {
let state = HasValue(value, ba);
let (state, result) = state.handle_agreement(|ba| ba.propose(true));
(state, result.map(|vote_step| step.join(vote_step)))
}
},
Accepted(mut bc) => match Self::convert_bc(f(&mut bc)) {
Err(err) => (Accepted(bc), Err(err)),
Ok((None, step)) => (Accepted(bc), Ok(step)),
Ok((Some(value), step)) => (Complete(true), Ok(step.with_output(value))),
},
state @ HasValue(_, _) | state @ Complete(_) => (state, Ok(Step::default())),
}
}
/// Applies `f` to the `BinaryAgreement` instance, and updates the state according to the
/// outcome.
fn handle_agreement<F>(self, f: F) -> (Self, Result<Step<N, S>>)
where
F: FnOnce(&mut BaInstance<N, S>) -> BaResult<N, S>,
{
use self::ProposalState::*;
match self {
Ongoing(bc, mut ba) => match Self::convert_ba(f(&mut ba)) {
Err(err) => (Ongoing(bc, ba), Err(err)),
Ok((None, step)) => (Ongoing(bc, ba), Ok(step)),
Ok((Some(false), step)) => (Complete(false), Ok(step)),
Ok((Some(true), step)) => (Accepted(bc), Ok(step)),
},
HasValue(value, mut ba) => match Self::convert_ba(f(&mut ba)) {
Err(err) => (HasValue(value, ba), Err(err)),
Ok((None, step)) => (HasValue(value, ba), Ok(step)),
Ok((Some(false), step)) => (Complete(false), Ok(step)),
Ok((Some(true), step)) => (Complete(true), Ok(step.with_output(value))),
},
state @ Accepted(_) | state @ Complete(_) => (state, Ok(Step::default())),
}
}
/// Converts a `Broadcast` result and returns the output, if there was one.
fn convert_bc(result: broadcast::Result<broadcast::Step<N>>) -> Result<ValueAndStep<N, S>> {
let bc_step = result.map_err(Error::HandleBroadcast)?;
let mut step = Step::default();
let opt_value = step.extend_with(bc_step, MessageContent::Broadcast).pop();
Ok((opt_value, step))
}
/// Converts a `BinaryAgreement` step and returns the output, if there was one.
fn convert_ba(result: BaResult<N, S>) -> Result<(Option<bool>, Step<N, S>)> {
let ba_step = result.map_err(Error::HandleAgreement)?;
let mut step = Step::default();
let opt_decision = step.extend_with(ba_step, MessageContent::Agreement).pop();
Ok((opt_decision, step))
}
/// Applies the given transition to `self`.
fn transition<F>(&mut self, f: F) -> Result<Step<N, S>>
where
F: FnOnce(Self) -> (Self, Result<Step<N, S>>),
{
// Temporary value: We need to take ownership of the state to make it transition.
let (new_state, result) = f(mem::replace(self, ProposalState::Complete(false)));
*self = new_state;
result
}
}

185
src/subset/subset.rs Normal file
View File

@ -0,0 +1,185 @@
use std::collections::BTreeMap;
use std::sync::Arc;
use std::{fmt, result};
use derivative::Derivative;
use hex_fmt::HexFmt;
use log::debug;
use serde_derive::Serialize;
use super::proposal_state::{ProposalState, Step as ProposalStep};
use super::{Error, Message, MessageContent, Result};
use rand::Rand;
use {util, DistAlgorithm, NetworkInfo, NodeIdT, SessionIdT};
pub type Step<N, S> = ::Step<Subset<N, S>>;
#[derive(Derivative, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[derivative(Debug)]
pub enum SubsetOutput<N> {
Contribution(
N,
#[derivative(Debug(format_with = "util::fmt_hex"))] Vec<u8>,
),
Done,
}
/// Subset algorithm instance
#[derive(Debug)]
pub struct Subset<N: Rand, S> {
/// Shared network information.
netinfo: Arc<NetworkInfo<N>>,
/// The session identifier.
session_id: S,
/// A map that assigns to each validator the progress of their contribution.
proposal_states: BTreeMap<N, ProposalState<N, S>>,
/// Whether the instance has decided on a value.
decided: bool,
}
impl<N: NodeIdT + Rand, S: SessionIdT> DistAlgorithm for Subset<N, S> {
type NodeId = N;
type Input = Vec<u8>;
type Output = SubsetOutput<N>;
type Message = Message<N>;
type Error = Error;
fn handle_input(&mut self, input: Self::Input) -> Result<Step<N, S>> {
self.propose(input)
}
fn handle_message(&mut self, sender_id: &N, message: Message<N>) -> Result<Step<N, S>> {
self.handle_message(sender_id, message)
}
fn terminated(&self) -> bool {
self.decided
}
fn our_id(&self) -> &Self::NodeId {
self.netinfo.our_id()
}
}
impl<N: NodeIdT + Rand, S: SessionIdT> Subset<N, S> {
/// Creates a new `Subset` instance with the given session identifier.
///
/// If multiple `Subset`s are instantiated within a single network, they must use different
/// session identifiers to foil replay attacks.
pub fn new(netinfo: Arc<NetworkInfo<N>>, session_id: S) -> Result<Self> {
let mut proposal_states = BTreeMap::new();
for (proposer_idx, proposer_id) in netinfo.all_ids().enumerate() {
let ba_id = BaSessionId {
subset_id: session_id.clone(),
proposer_idx: proposer_idx as u32,
};
proposal_states.insert(
proposer_id.clone(),
ProposalState::new(netinfo.clone(), ba_id, proposer_id.clone())?,
);
}
Ok(Subset {
netinfo,
session_id,
proposal_states,
decided: false,
})
}
/// Proposes a value for the subset.
///
/// Returns an error if we already made a proposal.
pub fn propose(&mut self, value: Vec<u8>) -> Result<Step<N, S>> {
if !self.netinfo.is_validator() {
return Ok(Step::default());
}
debug!("{} proposing {:0.10}", self, HexFmt(&value));
let prop_step = self
.proposal_states
.get_mut(self.netinfo.our_id())
.ok_or(Error::UnknownProposer)?
.propose(value)?;
let step = Self::convert_step(self.netinfo.our_id(), prop_step);
Ok(step.join(self.try_output()?))
}
/// Handles a message received from `sender_id`.
///
/// This must be called with every message we receive from another node.
pub fn handle_message(&mut self, sender_id: &N, msg: Message<N>) -> Result<Step<N, S>> {
let prop_step = self
.proposal_states
.get_mut(&msg.proposer_id)
.ok_or(Error::UnknownProposer)?
.handle_message(sender_id, msg.content)?;
let step = Self::convert_step(&msg.proposer_id, prop_step);
Ok(step.join(self.try_output()?))
}
/// Returns the number of validators from which we have already received a proposal.
pub fn received_proposals(&self) -> usize {
let received = |state: &&ProposalState<N, S>| state.received();
self.proposal_states.values().filter(received).count()
}
fn convert_step(proposer_id: &N, prop_step: ProposalStep<N, S>) -> Step<N, S> {
let from_p_msg = |p_msg: MessageContent| p_msg.with(proposer_id.clone());
let mut step = Step::default();
if let Some(value) = step.extend_with(prop_step, from_p_msg).pop() {
let contribution = SubsetOutput::Contribution(proposer_id.clone(), value);
step.output.push(contribution);
}
step
}
/// Returns the number of Binary Agreement instances that have decided "yes".
fn count_accepted(&self) -> usize {
let accepted = |state: &&ProposalState<N, S>| state.accepted();
self.proposal_states.values().filter(accepted).count()
}
/// Checks the voting and termination conditions: If enough proposals have been accepted, votes
/// "no" for the remaining ones. If all proposals have been decided, outputs `Done`.
fn try_output(&mut self) -> Result<Step<N, S>> {
if self.decided || self.count_accepted() < self.netinfo.num_correct() {
return Ok(Step::default());
}
let mut step = Step::default();
if self.count_accepted() == self.netinfo.num_correct() {
for (proposer_id, state) in &mut self.proposal_states {
step.extend(Self::convert_step(proposer_id, state.vote_false()?));
}
}
if self.proposal_states.values().all(ProposalState::complete) {
self.decided = true;
step.output.push(SubsetOutput::Done);
}
Ok(step)
}
}
impl<N: NodeIdT + Rand, S: SessionIdT> fmt::Display for Subset<N, S> {
fn fmt(&self, f: &mut fmt::Formatter) -> result::Result<(), fmt::Error> {
write!(f, "{:?} Subset({})", self.our_id(), self.session_id)
}
}
/// A session identifier for a `BinaryAgreement` instance run as a `Subset` sub-algorithm. It
/// consists of the `Subset` instance's own session ID, and the index of the proposer whose
/// contribution this `BinaryAgreement` is about.
#[derive(Clone, Debug, Serialize)]
pub struct BaSessionId<S> {
subset_id: S,
proposer_idx: u32,
}
impl<S: fmt::Display> fmt::Display for BaSessionId<S> {
fn fmt(&self, f: &mut fmt::Formatter) -> result::Result<(), fmt::Error> {
write!(
f,
"subset {}, proposer #{}",
self.subset_id, self.proposer_idx
)
}
}

View File

@ -26,8 +26,8 @@ pub trait Message: Debug + Send + Sync {}
impl<M> Message for M where M: Debug + Send + Sync {}
/// Session identifiers.
pub trait SessionIdT: Display + Serialize + Send + Sync + Clone {}
impl<S> SessionIdT for S where S: Display + Serialize + Send + Sync + Clone {}
pub trait SessionIdT: Display + Serialize + Send + Sync + Clone + Debug {}
impl<S> SessionIdT for S where S: Display + Serialize + Send + Sync + Clone + Debug {}
/// Epochs.
pub trait EpochT: Copy + Message + Default + Eq + Ord + Serialize + DeserializeOwned {}

View File

@ -5,6 +5,7 @@
use std::fmt;
use hex_fmt::HexFmt;
use rand;
/// Workaround trait for creating new random number generators
@ -29,3 +30,8 @@ where
pub fn fmt_rng<T>(_: T, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str("<RNG>")
}
/// Prints a byte slice as shortened hexadecimal in debug output.
pub fn fmt_hex<T: AsRef<[u8]>>(bytes: T, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{:10}", HexFmt(bytes))
}