introduced common shared network information

This commit is contained in:
Vladimir Komendantskiy 2018-05-29 13:17:30 +01:00
parent 332d5fbbe3
commit d09f3e26b4
10 changed files with 222 additions and 148 deletions

View File

@ -6,11 +6,12 @@ use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::fmt::Debug;
use std::hash::Hash;
use std::mem::replace;
use std::rc::Rc;
use itertools::Itertools;
use agreement::bin_values::BinValues;
use messaging::{DistAlgorithm, Target, TargetedMessage};
use messaging::{DistAlgorithm, NetworkInfo, Target, TargetedMessage};
error_chain!{
types {
@ -67,10 +68,9 @@ impl AgreementMessage {
/// Binary Agreement instance
pub struct Agreement<NodeUid> {
/// This node's ID.
uid: NodeUid,
num_nodes: usize,
num_faulty_nodes: usize,
/// Shared network information.
netinfo: Rc<NetworkInfo<NodeUid>>,
/// Agreement algorithm epoch.
epoch: u32,
/// Bin values. Reset on every epoch update.
bin_values: BinValues,
@ -162,18 +162,14 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> DistAlgorithm for Agreement<NodeU
}
fn our_id(&self) -> &Self::NodeUid {
&self.uid
self.netinfo.our_uid()
}
}
impl<NodeUid: Clone + Debug + Eq + Hash + Ord> Agreement<NodeUid> {
pub fn new(uid: NodeUid, num_nodes: usize) -> Self {
let num_faulty_nodes = (num_nodes - 1) / 3;
pub fn new(netinfo: Rc<NetworkInfo<NodeUid>>) -> Self {
Agreement {
uid,
num_nodes,
num_faulty_nodes,
netinfo,
epoch: 0,
bin_values: BinValues::new(),
received_bval: BTreeMap::new(),
@ -195,7 +191,7 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> Agreement<NodeUid> {
if self.epoch != 0 || self.estimated.is_some() {
return Err(ErrorKind::InputNotAccepted.into());
}
if self.num_nodes == 1 {
if self.netinfo.num_nodes() == 1 {
self.decision = Some(input);
self.output = Some(input);
self.terminated = true;
@ -225,7 +221,7 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> Agreement<NodeUid> {
// upon receiving `BVal(b)` messages from 2f + 1 nodes,
// bin_values := bin_values {b}
if count_bval == 2 * self.num_faulty_nodes + 1 {
if count_bval == 2 * self.netinfo.num_faulty() + 1 {
let previous_bin_values = self.bin_values;
let bin_values_changed = self.bin_values.insert(b);
@ -241,7 +237,7 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> Agreement<NodeUid> {
} else {
Ok(())
}
} else if count_bval == self.num_faulty_nodes + 1 && !self.sent_bval.contains(&b) {
} else if count_bval == self.netinfo.num_faulty() + 1 && !self.sent_bval.contains(&b) {
// upon receiving `BVal(b)` messages from f + 1 nodes, if
// `BVal(b)` has not been sent, multicast `BVal(b)`
self.send_bval(b)
@ -257,8 +253,8 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> Agreement<NodeUid> {
self.messages
.push_back(AgreementMessage::bval(self.epoch, b));
// Receive the `BVal` message locally.
let our_uid = self.uid.clone();
self.handle_bval(&our_uid, b)
let our_uid = &self.netinfo.our_uid().clone();
self.handle_bval(our_uid, b)
}
fn send_conf(&mut self) -> AgreementResult<()> {
@ -274,8 +270,8 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> Agreement<NodeUid> {
// Trigger the start of the `Conf` round.
self.conf_round = true;
// Receive the `Conf` message locally.
let our_uid = self.uid.clone();
self.handle_conf(&our_uid, v)
let our_uid = &self.netinfo.our_uid().clone();
self.handle_conf(our_uid, v)
}
/// Waits until at least (N f) `Aux` messages have been received, such that
@ -292,7 +288,7 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> Agreement<NodeUid> {
if self.bin_values == BinValues::None {
return Ok(());
}
if self.count_aux() < self.num_nodes - self.num_faulty_nodes {
if self.count_aux() < self.netinfo.num_nodes() - self.netinfo.num_faulty() {
// Continue waiting for the (N - f) `Aux` messages.
return Ok(());
}
@ -308,7 +304,7 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> Agreement<NodeUid> {
fn try_finish_conf_round(&mut self) -> AgreementResult<()> {
if self.conf_round {
let (count_vals, vals) = self.count_conf();
if count_vals < self.num_nodes - self.num_faulty_nodes {
if count_vals < self.netinfo.num_nodes() - self.netinfo.num_faulty() {
// Continue waiting for (N - f) `Conf` messages
return Ok(());
}
@ -323,8 +319,8 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> Agreement<NodeUid> {
self.messages
.push_back(AgreementMessage::aux(self.epoch, b));
// Receive the `Aux` message locally.
let our_uid = self.uid.clone();
self.handle_aux(&our_uid, b)
let our_uid = &self.netinfo.our_uid().clone();
self.handle_aux(our_uid, b)
}
/// The count of `Aux` messages such that the set of values carried by those messages is a
@ -365,7 +361,11 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> Agreement<NodeUid> {
/// optional decision value. The function may start the next epoch. In that case, it also
/// returns a message for broadcast.
fn invoke_coin(&mut self, vals: BinValues) -> AgreementResult<()> {
debug!("{:?} invoke_coin in epoch {}", self.uid, self.epoch);
debug!(
"{:?} invoke_coin in epoch {}",
self.netinfo.our_uid(),
self.epoch
);
// FIXME: Implement the Common Coin algorithm. At the moment the
// coin value is common across different nodes but not random.
let coin = (self.epoch % 2) == 0;
@ -375,14 +375,15 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> Agreement<NodeUid> {
// some round r' > r."
self.terminated = self.terminated || self.decision == Some(coin);
if self.terminated {
debug!("Agreement instance {:?} terminated", self.uid);
debug!("Agreement instance {:?} terminated", self.netinfo.our_uid());
return Ok(());
}
self.start_next_epoch();
debug!(
"Agreement instance {:?} started epoch {}",
self.uid, self.epoch
self.netinfo.our_uid(),
self.epoch
);
if let Some(b) = vals.definite() {
@ -393,7 +394,11 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> Agreement<NodeUid> {
self.output = Some(b);
// Latch the decided state.
self.decision = Some(b);
debug!("Agreement instance {:?} output: {}", self.uid, b);
debug!(
"Agreement instance {:?} output: {}",
self.netinfo.our_uid(),
b
);
}
} else {
self.estimated = Some(coin);

View File

@ -1,13 +1,15 @@
use std::collections::{BTreeMap, VecDeque};
use std::fmt::{self, Debug};
use std::iter::once;
use std::rc::Rc;
use fmt::{HexBytes, HexList, HexProof};
use merkle::{MerkleTree, Proof};
use reed_solomon_erasure as rse;
use reed_solomon_erasure::ReedSolomon;
use ring::digest;
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::fmt::{self, Debug};
use std::iter;
use messaging::{DistAlgorithm, Target, TargetedMessage};
use messaging::{DistAlgorithm, NetworkInfo, Target, TargetedMessage};
error_chain!{
types {
@ -89,14 +91,10 @@ impl Debug for BroadcastMessage {
/// * So a node with `2 * f + 1` `Ready`s and `f + 1` `Echos` will decode and _output_ the value,
/// knowing that every other good node will eventually do the same.
pub struct Broadcast<N> {
/// The UID of this node.
our_id: N,
/// Shared network data.
netinfo: Rc<NetworkInfo<N>>,
/// The UID of the sending node.
proposer_id: N,
/// UIDs of all nodes for iteration purposes.
all_uids: BTreeSet<N>,
num_nodes: usize,
num_faulty_nodes: usize,
data_shard_num: usize,
coding: Coding,
/// Whether we have already multicast `Echo`.
@ -125,21 +123,21 @@ impl<N: Eq + Debug + Clone + Ord> DistAlgorithm for Broadcast<N> {
type Error = Error;
fn input(&mut self, input: Self::Input) -> BroadcastResult<()> {
if self.our_id != self.proposer_id {
if *self.netinfo.our_uid() != self.proposer_id {
return Err(ErrorKind::InstanceCannotPropose.into());
}
// Split the value into chunks/shards, encode them with erasure codes.
// Assemble a Merkle tree from data and parity shards. Take all proofs
// from this tree and send them, each to its own node.
let proof = self.send_shards(input)?;
let our_uid = &self.netinfo.our_uid().clone();
// TODO: We'd actually need to return the output here, if it was only one node. Should that
// use-case be supported?
let our_id = self.our_id.clone();
self.handle_value(&our_id, proof)
self.handle_value(our_uid, proof)
}
fn handle_message(&mut self, sender_id: &N, message: Self::Message) -> BroadcastResult<()> {
if !self.all_uids.contains(sender_id) {
if !self.netinfo.all_uids().contains(sender_id) {
return Err(ErrorKind::UnknownSender.into());
}
match message {
@ -162,26 +160,21 @@ impl<N: Eq + Debug + Clone + Ord> DistAlgorithm for Broadcast<N> {
}
fn our_id(&self) -> &N {
&self.our_id
self.netinfo.our_uid()
}
}
impl<N: Eq + Debug + Clone + Ord> Broadcast<N> {
/// Creates a new broadcast instance to be used by node `our_id` which expects a value proposal
/// from node `proposer_id`.
pub fn new(our_id: N, proposer_id: N, all_uids: BTreeSet<N>) -> BroadcastResult<Self> {
let num_nodes = all_uids.len();
let num_faulty_nodes = (num_nodes - 1) / 3;
let parity_shard_num = 2 * num_faulty_nodes;
let data_shard_num = num_nodes - parity_shard_num;
pub fn new(netinfo: Rc<NetworkInfo<N>>, proposer_id: N) -> BroadcastResult<Self> {
let parity_shard_num = 2 * netinfo.num_faulty();
let data_shard_num = netinfo.num_nodes() - parity_shard_num;
let coding = Coding::new(data_shard_num, parity_shard_num)?;
Ok(Broadcast {
our_id,
netinfo,
proposer_id,
all_uids,
num_nodes,
num_faulty_nodes,
data_shard_num,
coding,
echo_sent: false,
@ -240,7 +233,7 @@ impl<N: Eq + Debug + Clone + Ord> Broadcast<N> {
let shards_t: Vec<Vec<u8>> = shards
.into_iter()
.enumerate()
.map(|(i, s)| iter::once(i as u8).chain(s.iter().cloned()).collect())
.map(|(i, s)| once(i as u8).chain(s.iter().cloned()).collect())
.collect();
// Convert the Merkle tree into a partial binary tree for later
@ -249,14 +242,14 @@ impl<N: Eq + Debug + Clone + Ord> Broadcast<N> {
// Default result in case of `gen_proof` error.
let mut result = Err(ErrorKind::ProofConstructionFailed.into());
assert_eq!(self.num_nodes, mtree.iter().count());
assert_eq!(self.netinfo.num_nodes(), mtree.iter().count());
// Send each proof to a node.
for (leaf_value, uid) in mtree.iter().zip(&self.all_uids) {
for (leaf_value, uid) in mtree.iter().zip(self.netinfo.all_uids()) {
let proof = mtree
.gen_proof(leaf_value.to_vec())
.ok_or(ErrorKind::ProofConstructionFailed)?;
if *uid == self.our_id {
if *uid == *self.netinfo.our_uid() {
// The proof is addressed to this node.
result = Ok(proof);
} else {
@ -276,22 +269,27 @@ impl<N: Eq + Debug + Clone + Ord> Broadcast<N> {
if *sender_id != self.proposer_id {
info!(
"Node {:?} received Value from {:?} instead of {:?}.",
self.our_id, sender_id, self.proposer_id
self.netinfo.our_uid(),
sender_id,
self.proposer_id
);
return Ok(());
}
if self.echo_sent {
info!("Node {:?} received multiple Values.", self.our_id);
info!(
"Node {:?} received multiple Values.",
self.netinfo.our_uid()
);
return Ok(());
}
if !self.validate_proof(&p, &self.our_id) {
if !self.validate_proof(&p, &self.netinfo.our_uid()) {
return Ok(());
}
// Otherwise multicast the proof in an `Echo` message, and handle it ourselves.
self.echo_sent = true;
let our_id = self.our_id.clone();
self.handle_echo(&our_id, p.clone())?;
let our_uid = &self.netinfo.our_uid().clone();
self.handle_echo(our_uid, p.clone())?;
let echo_msg = Target::All.message(BroadcastMessage::Echo(p));
self.messages.push_back(echo_msg);
Ok(())
@ -303,7 +301,8 @@ impl<N: Eq + Debug + Clone + Ord> Broadcast<N> {
if self.echos.contains_key(sender_id) {
info!(
"Node {:?} received multiple Echos from {:?}.",
self.our_id, sender_id,
self.netinfo.our_uid(),
sender_id,
);
return Ok(());
}
@ -316,7 +315,9 @@ impl<N: Eq + Debug + Clone + Ord> Broadcast<N> {
// Save the proof for reconstructing the tree later.
self.echos.insert(sender_id.clone(), p);
if self.ready_sent || self.count_echos(&hash) < self.num_nodes - self.num_faulty_nodes {
if self.ready_sent
|| self.count_echos(&hash) < self.netinfo.num_nodes() - self.netinfo.num_faulty()
{
return self.compute_output(&hash);
}
@ -324,8 +325,8 @@ impl<N: Eq + Debug + Clone + Ord> Broadcast<N> {
self.ready_sent = true;
let ready_msg = Target::All.message(BroadcastMessage::Ready(hash.clone()));
self.messages.push_back(ready_msg);
let our_id = self.our_id.clone();
self.handle_ready(&our_id, &hash)
let our_uid = &self.netinfo.our_uid().clone();
self.handle_ready(our_uid, &hash)
}
/// Handles a received `Ready` message.
@ -334,7 +335,8 @@ impl<N: Eq + Debug + Clone + Ord> Broadcast<N> {
if self.readys.contains_key(sender_id) {
info!(
"Node {:?} received multiple Readys from {:?}.",
self.our_id, sender_id
self.netinfo.our_uid(),
sender_id
);
return Ok(());
}
@ -343,7 +345,7 @@ impl<N: Eq + Debug + Clone + Ord> Broadcast<N> {
// Upon receiving f + 1 matching Ready(h) messages, if Ready
// has not yet been sent, multicast Ready(h).
if self.count_readys(hash) == self.num_faulty_nodes + 1 && !self.ready_sent {
if self.count_readys(hash) == self.netinfo.num_faulty() + 1 && !self.ready_sent {
// Enqueue a broadcast of a Ready message.
self.ready_sent = true;
let ready_msg = Target::All.message(BroadcastMessage::Ready(hash.to_vec()));
@ -356,8 +358,8 @@ impl<N: Eq + Debug + Clone + Ord> Broadcast<N> {
/// value.
fn compute_output(&mut self, hash: &[u8]) -> BroadcastResult<()> {
if self.decided
|| self.count_readys(hash) <= 2 * self.num_faulty_nodes
|| self.count_echos(hash) <= self.num_faulty_nodes
|| self.count_readys(hash) <= 2 * self.netinfo.num_faulty()
|| self.count_echos(hash) <= self.netinfo.num_faulty()
{
return Ok(());
}
@ -365,7 +367,8 @@ impl<N: Eq + Debug + Clone + Ord> Broadcast<N> {
// Upon receiving 2f + 1 matching Ready(h) messages, wait for N 2f Echo messages.
self.decided = true;
let mut leaf_values: Vec<Option<Box<[u8]>>> = self
.all_uids
.netinfo
.all_uids()
.iter()
.map(|id| {
self.echos.get(id).and_then(|p| {
@ -384,7 +387,7 @@ impl<N: Eq + Debug + Clone + Ord> Broadcast<N> {
/// Returns `i` if `node_id` is the `i`-th ID among all participating nodes.
fn index_of_node(&self, node_id: &N) -> Option<usize> {
self.all_uids.iter().position(|id| id == node_id)
self.netinfo.all_uids().iter().position(|id| id == node_id)
}
/// Returns `true` if the proof is valid and has the same index as the node ID. Otherwise
@ -393,16 +396,16 @@ impl<N: Eq + Debug + Clone + Ord> Broadcast<N> {
if !p.validate(&p.root_hash) {
info!(
"Node {:?} received invalid proof: {:?}",
self.our_id,
self.netinfo.our_uid(),
HexProof(&p)
);
false
} else if self.index_of_node(id) != Some(p.value[0] as usize)
|| p.index(self.num_nodes) != p.value[0] as usize
|| p.index(self.netinfo.num_nodes()) != p.value[0] as usize
{
info!(
"Node {:?} received proof for wrong position: {:?}.",
self.our_id,
self.netinfo.our_uid(),
HexProof(&p)
);
false

View File

@ -6,13 +6,14 @@
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::fmt::Debug;
use std::hash::Hash;
use std::rc::Rc;
use agreement;
use agreement::{Agreement, AgreementMessage};
use broadcast;
use broadcast::{Broadcast, BroadcastMessage};
use fmt::HexBytes;
use messaging::{DistAlgorithm, Target, TargetedMessage};
use messaging::{DistAlgorithm, NetworkInfo, Target, TargetedMessage};
error_chain!{
types {
@ -98,15 +99,17 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> MessageQueue<NodeUid> {
/// * Once all `Agreement` instances have decided, `CommonSubset` returns the set of all proposed
/// values for which the decision was "yes".
pub struct CommonSubset<NodeUid: Eq + Hash + Ord> {
uid: NodeUid,
num_nodes: usize,
num_faulty_nodes: usize,
/// Shared network information.
netinfo: Rc<NetworkInfo<NodeUid>>,
broadcast_instances: BTreeMap<NodeUid, Broadcast<NodeUid>>,
agreement_instances: BTreeMap<NodeUid, Agreement<NodeUid>>,
broadcast_results: BTreeMap<NodeUid, ProposedValue>,
agreement_results: BTreeMap<NodeUid, bool>,
/// Outgoing message queue.
messages: MessageQueue<NodeUid>,
/// The output value of the algorithm.
output: Option<BTreeMap<NodeUid, ProposedValue>>,
/// Whether the instance has decided on a value.
decided: bool,
}
@ -145,38 +148,32 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> DistAlgorithm for CommonSubset<No
}
fn our_id(&self) -> &Self::NodeUid {
&self.uid
self.netinfo.our_uid()
}
}
impl<NodeUid: Clone + Debug + Eq + Hash + Ord> CommonSubset<NodeUid> {
pub fn new(uid: NodeUid, all_uids: &BTreeSet<NodeUid>) -> CommonSubsetResult<Self> {
let num_nodes = all_uids.len();
let num_faulty_nodes = (num_nodes - 1) / 3;
pub fn new(netinfo: Rc<NetworkInfo<NodeUid>>) -> CommonSubsetResult<Self> {
let num_nodes = netinfo.num_nodes();
let num_faulty_nodes = netinfo.num_faulty();
// Create all broadcast instances.
let mut broadcast_instances: BTreeMap<NodeUid, Broadcast<NodeUid>> = BTreeMap::new();
for proposer_id in all_uids {
for proposer_id in netinfo.all_uids() {
broadcast_instances.insert(
proposer_id.clone(),
Broadcast::new(
uid.clone(),
proposer_id.clone(),
all_uids.iter().cloned().collect(),
)?,
Broadcast::new(netinfo.clone(), proposer_id.clone())?,
);
}
// Create all agreement instances.
let mut agreement_instances: BTreeMap<NodeUid, Agreement<NodeUid>> = BTreeMap::new();
for proposer_id in all_uids.iter().cloned() {
agreement_instances.insert(proposer_id, Agreement::new(uid.clone(), num_nodes));
for proposer_id in netinfo.all_uids().iter().cloned() {
agreement_instances.insert(proposer_id, Agreement::new(netinfo.clone()));
}
Ok(CommonSubset {
uid,
num_nodes,
num_faulty_nodes,
netinfo,
broadcast_instances,
agreement_instances,
broadcast_results: BTreeMap::new(),
@ -190,7 +187,7 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> CommonSubset<NodeUid> {
/// Common Subset input message handler. It receives a value for broadcast
/// and redirects it to the corresponding broadcast instance.
pub fn send_proposed_value(&mut self, value: ProposedValue) -> CommonSubsetResult<()> {
let uid = self.uid.clone();
let uid = self.netinfo.our_uid().clone();
// Upon receiving input v_i , input v_i to RBC_i. See Figure 2.
self.process_broadcast(&uid, |bc| bc.input(value))
}
@ -280,10 +277,11 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> CommonSubset<NodeUid> {
}
debug!(
"{:?} Updated Agreement results: {:?}",
self.uid, self.agreement_results
self.netinfo.our_uid(),
self.agreement_results
);
if value && self.count_true() == self.num_nodes - self.num_faulty_nodes {
if value && self.count_true() == self.netinfo.num_nodes() - self.netinfo.num_faulty() {
// Upon delivery of value 1 from at least N f instances of BA, provide
// input 0 to each instance of BA that has not yet been provided input.
for (uid, agreement) in &mut self.agreement_instances {
@ -308,16 +306,20 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> CommonSubset<NodeUid> {
}
fn try_agreement_completion(&mut self) {
if self.decided || self.count_true() < self.num_nodes - self.num_faulty_nodes {
if self.decided || self.count_true() < self.netinfo.num_nodes() - self.netinfo.num_faulty()
{
return;
}
// Once all instances of BA have completed, let C ⊂ [1..N] be
// the indexes of each BA that delivered 1. Wait for the output
// v_j for each RBC_j such that j∈C. Finally output j∈C v_j.
if self.agreement_results.len() < self.num_nodes {
if self.agreement_results.len() < self.netinfo.num_nodes() {
return;
}
debug!("{:?} All Agreement instances have terminated", self.uid);
debug!(
"{:?} All Agreement instances have terminated",
self.netinfo.our_uid()
);
// All instances of Agreement that delivered `true` (or "1" in the paper).
let delivered_1: BTreeSet<&NodeUid> = self
.agreement_results
@ -336,7 +338,10 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> CommonSubset<NodeUid> {
.collect();
if delivered_1.len() == broadcast_results.len() {
debug!("{:?} Agreement instances completed:", self.uid);
debug!(
"{:?} Agreement instances completed:",
self.netinfo.our_uid()
);
for (uid, result) in &broadcast_results {
debug!(" {:?} → {:?}", uid, HexBytes(&result));
}

View File

@ -2,6 +2,7 @@ use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::fmt::Debug;
use std::hash::Hash;
use std::rc::Rc;
use std::{cmp, iter};
use bincode;
@ -10,7 +11,7 @@ use serde::de::DeserializeOwned;
use serde::Serialize;
use common_subset::{self, CommonSubset};
use messaging::{DistAlgorithm, TargetedMessage};
use messaging::{DistAlgorithm, NetworkInfo, TargetedMessage};
error_chain!{
types {
@ -33,6 +34,8 @@ error_chain!{
/// An instance of the Honey Badger Byzantine fault tolerant consensus algorithm.
pub struct HoneyBadger<T, N: Eq + Hash + Ord + Clone> {
/// Shared network data.
netinfo: Rc<NetworkInfo<N>>,
/// The buffer of transactions that have not yet been included in any output batch.
buffer: Vec<T>,
/// The earliest epoch from which we have not yet received output.
@ -40,10 +43,6 @@ pub struct HoneyBadger<T, N: Eq + Hash + Ord + Clone> {
/// The Asynchronous Common Subset instance that decides which nodes' transactions to include,
/// indexed by epoch.
common_subsets: BTreeMap<u64, CommonSubset<N>>,
/// This node's ID.
id: N,
/// The set of all node IDs of the participants (including ourselves).
all_uids: BTreeSet<N>,
/// The target number of transactions to be included in each batch.
// TODO: Do experiments and recommend a batch size. It should be proportional to
// `num_nodes * num_nodes * log(num_nodes)`.
@ -70,7 +69,7 @@ where
}
fn handle_message(&mut self, sender_id: &N, message: Self::Message) -> HoneyBadgerResult<()> {
if !self.all_uids.contains(sender_id) {
if !self.netinfo.all_uids().contains(sender_id) {
return Err(ErrorKind::UnknownSender.into());
}
match message {
@ -93,7 +92,7 @@ where
}
fn our_id(&self) -> &N {
&self.id
self.netinfo.our_uid()
}
}
@ -105,7 +104,7 @@ where
{
/// Returns a new Honey Badger instance with the given parameters, starting at epoch `0`.
pub fn new<I, TI>(
id: N,
our_uid: N,
all_uids_iter: I,
batch_size: usize,
txs: TI,
@ -115,16 +114,15 @@ where
TI: IntoIterator<Item = T>,
{
let all_uids: BTreeSet<N> = all_uids_iter.into_iter().collect();
if !all_uids.contains(&id) {
if !all_uids.contains(&our_uid) {
return Err(ErrorKind::OwnIdMissing.into());
}
let mut honey_badger = HoneyBadger {
netinfo: Rc::new(NetworkInfo::new(our_uid, all_uids)),
buffer: txs.into_iter().collect(),
epoch: 0,
common_subsets: BTreeMap::new(),
id,
batch_size,
all_uids,
messages: MessageQueue(VecDeque::new()),
output: VecDeque::new(),
};
@ -143,9 +141,7 @@ where
let proposal = self.choose_transactions()?;
let cs = match self.common_subsets.entry(self.epoch) {
Entry::Occupied(entry) => entry.into_mut(),
Entry::Vacant(entry) => {
entry.insert(CommonSubset::new(self.id.clone(), &self.all_uids)?)
}
Entry::Vacant(entry) => entry.insert(CommonSubset::new(self.netinfo.clone())?),
};
cs.input(proposal)?;
self.messages.extend_with_epoch(self.epoch, cs);
@ -156,7 +152,7 @@ where
/// serializes them.
fn choose_transactions(&self) -> HoneyBadgerResult<Vec<u8>> {
let mut rng = rand::thread_rng();
let amount = cmp::max(1, self.batch_size / self.all_uids.len());
let amount = cmp::max(1, self.batch_size / self.netinfo.all_uids().len());
let batch_size = cmp::min(self.batch_size, self.buffer.len());
let sample = match rand::seq::sample_iter(&mut rng, &self.buffer[..batch_size], amount) {
Ok(choice) => choice,
@ -164,7 +160,9 @@ where
};
debug!(
"{:?} Proposing in epoch {}: {:?}",
self.id, self.epoch, sample
self.netinfo.our_uid(),
self.epoch,
sample
);
Ok(bincode::serialize(&sample)?)
}
@ -184,7 +182,7 @@ where
if epoch < self.epoch {
return Ok(()); // Epoch has already terminated. Message is obsolete.
} else {
entry.insert(CommonSubset::new(self.id.clone(), &self.all_uids)?)
entry.insert(CommonSubset::new(self.netinfo.clone())?)
}
}
};
@ -217,7 +215,9 @@ where
self.buffer.retain(|tx| !transactions.contains(tx));
debug!(
"{:?} Epoch {} output {:?}",
self.id, self.epoch, transactions
self.netinfo.our_uid(),
self.epoch,
transactions
);
// Queue the output and advance the epoch.
self.output.push_back(Batch {
@ -248,7 +248,11 @@ where
.get(&epoch)
.map_or(false, CommonSubset::terminated)
{
debug!("{:?} Epoch {} has terminated.", self.id, epoch);
debug!(
"{:?} Epoch {} has terminated.",
self.netinfo.our_uid(),
epoch
);
self.common_subsets.remove(&epoch);
}
}

View File

@ -1,3 +1,4 @@
use std::collections::BTreeSet;
use std::fmt::Debug;
/// Message sent by a given source.
@ -9,10 +10,9 @@ pub struct SourcedMessage<M, N> {
/// Message destination can be either of the two:
///
/// 1) `All`: all nodes if sent to socket tasks, or all local algorithm
/// instances if received from socket tasks.
/// 1) `All`: all remote nodes.
///
/// 2) `Node(i)`: node i or local algorithm instances with the node index i.
/// 2) `Node(id)`: remote node `id`.
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum Target<N> {
All,
@ -124,3 +124,44 @@ impl<'a, D: DistAlgorithm + 'a> Iterator for OutputIter<'a, D> {
self.algorithm.next_output()
}
}
/// Common data shared between algorithms.
pub struct NetworkInfo<NodeUid> {
our_uid: NodeUid,
all_uids: BTreeSet<NodeUid>,
num_nodes: usize,
num_faulty: usize,
}
impl<NodeUid> NetworkInfo<NodeUid> {
pub fn new(our_uid: NodeUid, all_uids: BTreeSet<NodeUid>) -> Self {
let num_nodes = all_uids.len();
NetworkInfo {
our_uid,
all_uids,
num_nodes,
num_faulty: (num_nodes - 1) / 3,
}
}
/// The ID of the node the algorithm runs on.
pub fn our_uid(&self) -> &NodeUid {
&self.our_uid
}
/// ID of all nodes in the network.
pub fn all_uids(&self) -> &BTreeSet<NodeUid> {
&self.all_uids
}
/// The total number of nodes.
pub fn num_nodes(&self) -> usize {
self.num_nodes
}
/// The maximum number of faulty, Byzantine nodes up to which Honey Badger is guaranteed to be
/// correct.
pub fn num_faulty(&self) -> usize {
self.num_faulty
}
}

View File

@ -21,12 +21,14 @@ extern crate rand;
mod network;
use std::collections::BTreeSet;
use std::iter;
use std::iter::once;
use std::rc::Rc;
use rand::Rng;
use hbbft::agreement::Agreement;
use hbbft::messaging::NetworkInfo;
use network::{Adversary, MessageScheduler, NodeUid, SilentAdversary, TestNetwork, TestNode};
fn test_agreement<A: Adversary<Agreement<NodeUid>>>(
@ -46,7 +48,7 @@ fn test_agreement<A: Adversary<Agreement<NodeUid>>>(
let mut expected = input;
for node in network.nodes.values() {
if let Some(b) = expected {
assert!(iter::once(&b).eq(node.outputs()));
assert!(once(&b).eq(node.outputs()));
} else {
assert_eq!(1, node.outputs().len());
expected = Some(node.outputs()[0]);
@ -64,8 +66,8 @@ where
let mut rng = rand::thread_rng();
let sizes = (1..6)
.chain(iter::once(rng.gen_range(6, 20)))
.chain(iter::once(rng.gen_range(30, 50)));
.chain(once(rng.gen_range(6, 20)))
.chain(once(rng.gen_range(30, 50)));
for size in sizes {
let num_faulty_nodes = (size - 1) / 3;
let num_good_nodes = size - num_faulty_nodes;
@ -75,7 +77,7 @@ where
);
for &input in &[None, Some(false), Some(true)] {
let adversary = new_adversary(num_good_nodes, num_faulty_nodes);
let new_agreement = |id, all_ids: BTreeSet<_>| Agreement::new(id, all_ids.len());
let new_agreement = |netinfo: Rc<NetworkInfo<NodeUid>>| Agreement::new(netinfo);
let network =
TestNetwork::new(num_good_nodes, num_faulty_nodes, adversary, new_agreement);
test_agreement(network, input);

View File

@ -9,12 +9,13 @@ extern crate rand;
mod network;
use std::collections::{BTreeMap, BTreeSet};
use std::iter;
use std::iter::once;
use std::rc::Rc;
use rand::Rng;
use hbbft::broadcast::{Broadcast, BroadcastMessage};
use hbbft::messaging::{DistAlgorithm, TargetedMessage};
use hbbft::messaging::{DistAlgorithm, NetworkInfo, TargetedMessage};
use network::{Adversary, MessageScheduler, NodeUid, SilentAdversary, TestNetwork, TestNode};
/// An adversary that inputs an alternate value.
@ -65,7 +66,8 @@ impl Adversary<Broadcast<NodeUid>> for ProposeAdversary {
Some(id) => *id,
None => return vec![],
};
let mut bc = Broadcast::new(id, id, node_ids).expect("broadcast instance");
let netinfo = Rc::new(NetworkInfo::new(id, node_ids));
let mut bc = Broadcast::new(netinfo, id).expect("broadcast instance");
bc.input(b"Fake news".to_vec()).expect("propose");
bc.message_iter().map(|msg| (id, msg)).collect()
}
@ -88,12 +90,12 @@ fn test_broadcast<A: Adversary<Broadcast<NodeUid>>>(
}
// Verify that all instances output the proposed value.
for node in network.nodes.values() {
assert!(iter::once(&proposed_value.to_vec()).eq(node.outputs()));
assert!(once(&proposed_value.to_vec()).eq(node.outputs()));
}
}
fn new_broadcast(id: NodeUid, all_ids: BTreeSet<NodeUid>) -> Broadcast<NodeUid> {
Broadcast::new(id, NodeUid(0), all_ids).expect("Instantiate broadcast")
fn new_broadcast(netinfo: Rc<NetworkInfo<NodeUid>>) -> Broadcast<NodeUid> {
Broadcast::new(netinfo, NodeUid(0)).expect("Instantiate broadcast")
}
fn test_broadcast_different_sizes<A, F>(new_adversary: F, proposed_value: &[u8])
@ -103,8 +105,8 @@ where
{
let mut rng = rand::thread_rng();
let sizes = (1..6)
.chain(iter::once(rng.gen_range(6, 20)))
.chain(iter::once(rng.gen_range(30, 50)));
.chain(once(rng.gen_range(6, 20)))
.chain(once(rng.gen_range(30, 50)));
for size in sizes {
let num_faulty_nodes = (size - 1) / 3;
let num_good_nodes = size - num_faulty_nodes;

View File

@ -9,9 +9,11 @@ extern crate rand;
mod network;
use std::collections::{BTreeMap, BTreeSet};
use std::iter;
use std::iter::once;
use std::rc::Rc;
use hbbft::common_subset::CommonSubset;
use hbbft::messaging::NetworkInfo;
use network::{Adversary, MessageScheduler, NodeUid, SilentAdversary, TestNetwork, TestNode};
@ -37,7 +39,7 @@ fn test_common_subset<A: Adversary<CommonSubset<NodeUid>>>(
let mut expected = None;
for node in network.nodes.values() {
if let Some(output) = expected.as_ref() {
assert!(iter::once(output).eq(node.outputs()));
assert!(once(output).eq(node.outputs()));
continue;
}
assert_eq!(1, node.outputs().len());
@ -61,8 +63,8 @@ fn new_network<A: Adversary<CommonSubset<NodeUid>>>(
// This returns an error in all but the first test.
let _ = env_logger::try_init();
let new_common_subset = |id, all_ids: BTreeSet<_>| {
CommonSubset::new(id, &all_ids).expect("new Common Subset instance")
let new_common_subset = |netinfo: Rc<NetworkInfo<NodeUid>>| {
CommonSubset::new(netinfo).expect("new Common Subset instance")
};
TestNetwork::new(good_num, bad_num, adversary, new_common_subset)
}

View File

@ -8,12 +8,14 @@ extern crate rand;
mod network;
use std::collections::BTreeSet;
use std::iter;
use std::iter::once;
use std::rc::Rc;
use rand::Rng;
use hbbft::honey_badger::{self, HoneyBadger};
use hbbft::messaging::NetworkInfo;
use network::{Adversary, MessageScheduler, NodeUid, SilentAdversary, TestNetwork, TestNode};
/// Proposes `num_txs` values and expects nodes to output and order them.
@ -55,8 +57,10 @@ where
// TODO: Verify that all nodes output the same epochs.
}
fn new_honey_badger(id: NodeUid, all_ids: BTreeSet<NodeUid>) -> HoneyBadger<usize, NodeUid> {
HoneyBadger::new(id, all_ids, 12, 0..5).expect("Instantiate honey_badger")
fn new_honey_badger(netinfo: Rc<NetworkInfo<NodeUid>>) -> HoneyBadger<usize, NodeUid> {
let our_uid = netinfo.our_uid().clone();
let all_uids = netinfo.all_uids().clone();
HoneyBadger::new(our_uid, all_uids, 12, 0..5).expect("Instantiate honey_badger")
}
fn test_honey_badger_different_sizes<A, F>(new_adversary: F, num_txs: usize)
@ -69,8 +73,8 @@ where
let mut rng = rand::thread_rng();
let sizes = (4..5)
.chain(iter::once(rng.gen_range(6, 10)))
.chain(iter::once(rng.gen_range(11, 15)));
.chain(once(rng.gen_range(6, 10)))
.chain(once(rng.gen_range(11, 15)));
for size in sizes {
let num_faulty_nodes = (size - 1) / 3;
let num_good_nodes = size - num_faulty_nodes;

View File

@ -1,9 +1,10 @@
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::fmt::Debug;
use std::rc::Rc;
use rand::{self, Rng};
use hbbft::messaging::{DistAlgorithm, Target, TargetedMessage};
use hbbft::messaging::{DistAlgorithm, NetworkInfo, Target, TargetedMessage};
/// A node identifier. In the tests, nodes are simply numbered.
#[derive(Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Clone, Copy)]
@ -154,10 +155,15 @@ where
/// `adv_num` nodes.
pub fn new<F>(good_num: usize, adv_num: usize, adversary: A, new_algo: F) -> TestNetwork<A, D>
where
F: Fn(NodeUid, BTreeSet<NodeUid>) -> D,
F: Fn(Rc<NetworkInfo<NodeUid>>) -> D,
{
let node_ids: BTreeSet<NodeUid> = (0..(good_num + adv_num)).map(NodeUid).collect();
let new_node_by_id = |id: NodeUid| (id, TestNode::new(new_algo(id, node_ids.clone())));
let new_node_by_id = |id: NodeUid| {
(
id,
TestNode::new(new_algo(Rc::new(NetworkInfo::new(id, node_ids.clone())))),
)
};
let mut network = TestNetwork {
nodes: (0..good_num).map(NodeUid).map(new_node_by_id).collect(),
adversary,