Optimized broadcast #309 (#405)

* Added extra message types

* Add send functions for new message types

* Store original value message received from proposer

* Modify handle_value for optimized broadcast

* Modify handle_echo for optimized broadcast

* Add handle_echo_hash function for optimized broadcast

* Add handle_can_decode function for optimized broadcast

* Fixes handle_ready and send_echo functions:
1) Modify handle_ready function for optimized broadcast
2) Modify send_echo function to send `Echo` messages to different subset of nodes from
handle_value and handle_ready functions

* Remove value_message and fix typos

* Add functions for filtering all_ids

* Separate send_echo to send_echo_left and send_echo_remaining

* Rename pessimism_factor to fault_estimate

* Remove redundant bools from Broadcast struct

* Fix multiple counting of nodes who sent both `Echo` and `EchoHash` by changing
`echos` map structure

* Allow conflicting `CanDecode`s from same node

* Fix left and right iterators for `Echo` and `EchoHash` messages

* Fixes bugs in left and right iterators and adds additional checks in handle
functions

* Change can_decodes to BTreeMap<Digest, BTreeSet<N>> and fix send_can_decode

* Minor fixes

* Modify send_echo_remaining to take a hash parameter

* Fix bug in left and right iterators.

* Excluding proposer in iterator led to infinite loop when our_id == proposer_id

* Fix bug in handle_echo and compute_output
* send_can_decode call in handle_echo returned early
* compute_output needed `N - f` full `Echo`s to decode

* Refactor `echos` map to take an EchoContent Enum for `Echo` and `EchoHash` messages

* Run rustfmt

* Refactor to avoid index access and multiple map lookups

* Fix comments and minor refactorings.

* Add an`AllExcept(BTreeSet<N>)` type to `Target` enum to enable sending messages
to non-validators from Broadcast.
* Use `Target::AllExcept` in Broadcast to send `Echo` messages to all non-validator nodes.
* Add `AllExcept(_)` match arms for `Target` match expressions.

* Rename `AllExcept` parameter from `known` to `exclude`.

* Modify send_can_decode to send to all nodes who haven't sent an `Echo`.

* Update docs for broadcast

* Improve formatting and add comments for broadcast docs.

* Fix formatting.

* Allow for sending multiple `CanDecode` messages with different hashes.

* Fix comments.

* Fix bug in sending `Echo`s when node has not received `CanDecode`.
This commit is contained in:
Pawan Dhananjay 2019-06-12 20:32:39 +05:30 committed by Vladimir Komendantskiy
parent 15f7313706
commit 61f4ed9800
10 changed files with 424 additions and 41 deletions

View File

@ -2,6 +2,7 @@
use crossbeam::thread::{Scope, ScopedJoinHandle};
use crossbeam_channel::{self, bounded, select, unbounded, Receiver, Sender};
use hbbft::{SourcedMessage, Target, TargetedMessage};
use std::collections::BTreeSet;
/// The queue functionality for messages sent between algorithm instances.
/// The messaging struct allows for targeted message exchange between comms
@ -107,7 +108,7 @@ impl<M: Send> Messaging<M> {
select! {
recv(rx_from_algo) -> tm => {
if let Ok(tm) = tm {
match tm.target {
match &tm.target {
Target::All => {
// Send the message to all remote nodes, stopping at the first
// error.
@ -120,9 +121,26 @@ impl<M: Send> Messaging<M> {
}
}).map_err(Error::from);
},
Target::AllExcept(exclude) => {
// Send the message to all remote nodes not in `exclude`, stopping at the first
// error.
let filtered_txs: Vec<_> = (0..txs_to_comms.len())
.collect::<BTreeSet<_>>()
.difference(exclude)
.cloned()
.collect();
result = filtered_txs.iter()
.fold(Ok(()), |result, i| {
if result.is_ok() {
txs_to_comms[*i].send(tm.message.clone())
} else {
result
}
}).map_err(Error::from);
},
Target::Node(i) => {
result = if i < txs_to_comms.len() {
txs_to_comms[i].send(tm.message)
result = if *i < txs_to_comms.len() {
txs_to_comms[*i].send(tm.message)
.map_err(Error::from)
} else {
Err(Error::NoSuchTarget)

View File

@ -273,7 +273,7 @@ where
Q: IntoIterator<Item = TimestampedMessage<D>>,
{
for ts_msg in msgs {
match ts_msg.target {
match &ts_msg.target {
Target::All => {
for node in self.nodes.values_mut() {
if node.id != ts_msg.sender_id {
@ -281,6 +281,13 @@ where
}
}
}
Target::AllExcept(exclude) => {
for node in self.nodes.values_mut().filter(|n| !exclude.contains(&n.id)) {
if node.id != ts_msg.sender_id {
node.add_message(ts_msg.clone())
}
}
}
Target::Node(to_id) => {
if let Some(node) = self.nodes.get_mut(&to_id) {
node.add_message(ts_msg);

View File

@ -257,6 +257,22 @@ where
message_count = message_count.saturating_add(1);
}
dest.push_back(NetworkMessage::new(
stepped_id.clone(),
tmsg.message.clone(),
to.clone(),
));
}
}
hbbft::Target::AllExcept(exclude) => {
for to in nodes
.keys()
.filter(|&to| to != &stepped_id && !exclude.contains(to))
{
if !faulty {
message_count = message_count.saturating_add(1);
}
dest.push_back(NetworkMessage::new(
stepped_id.clone(),
tmsg.message.clone(),

View File

@ -1,4 +1,5 @@
use std::collections::BTreeMap;
use std::collections::BTreeSet;
use std::sync::Arc;
use std::{fmt, result};
@ -28,14 +29,23 @@ pub struct Broadcast<N> {
coding: Coding,
/// If we are the proposer: whether we have already sent the `Value` messages with the shards.
value_sent: bool,
/// Whether we have already multicast `Echo`.
/// Whether we have already sent `Echo` to all nodes who haven't sent `CanDecode`.
echo_sent: bool,
/// Whether we have already multicast `Ready`.
ready_sent: bool,
/// Whether we have already sent `EchoHash` to the right nodes.
echo_hash_sent: bool,
/// Whether we have already sent `CanDecode` for the given hash.
can_decode_sent: BTreeSet<Digest>,
/// Whether we have already output a value.
decided: bool,
/// The proofs we have received via `Echo` messages, by sender ID.
echos: BTreeMap<N, Proof<Vec<u8>>>,
/// Number of faulty nodes to optimize performance for.
fault_estimate: usize,
/// The hashes and proofs we have received via `Echo` and `EchoHash` messages, by sender ID.
echos: BTreeMap<N, EchoContent>,
/// The hashes we have received from nodes via `CanDecode` messages, by hash.
/// A node can receive conflicting `CanDecode`s from the same node.
can_decodes: BTreeMap<Digest, BTreeSet<N>>,
/// The root hashes we received via `Ready` messages, by sender ID.
readys: BTreeMap<N, Vec<u8>>,
}
@ -81,6 +91,7 @@ impl<N: NodeIdT> Broadcast<N> {
let data_shard_num = netinfo.num_nodes() - parity_shard_num;
let coding =
Coding::new(data_shard_num, parity_shard_num).map_err(|_| Error::InvalidNodeCount)?;
let fault_estimate = netinfo.num_faulty();
Ok(Broadcast {
netinfo,
@ -89,8 +100,12 @@ impl<N: NodeIdT> Broadcast<N> {
value_sent: false,
echo_sent: false,
ready_sent: false,
echo_hash_sent: false,
can_decode_sent: BTreeSet::new(),
decided: false,
fault_estimate,
echos: BTreeMap::new(),
can_decodes: BTreeMap::new(),
readys: BTreeMap::new(),
})
}
@ -123,6 +138,8 @@ impl<N: NodeIdT> Broadcast<N> {
Message::Value(p) => self.handle_value(sender_id, p),
Message::Echo(p) => self.handle_echo(sender_id, p),
Message::Ready(ref hash) => self.handle_ready(sender_id, hash),
Message::CanDecode(ref hash) => self.handle_can_decode(sender_id, hash),
Message::EchoHash(ref hash) => self.handle_echo_hash(sender_id, hash),
}
}
@ -200,8 +217,14 @@ impl<N: NodeIdT> Broadcast<N> {
let fault_kind = FaultKind::ReceivedValueFromNonProposer;
return Ok(Fault::new(sender_id.clone(), fault_kind).into());
}
if self.echo_sent {
if self.echos.get(self.our_id()) == Some(&p) {
match self.echos.get(self.our_id()) {
// Multiple values from proposer.
Some(val) if val.hash() != p.root_hash() => {
return Ok(Fault::new(sender_id.clone(), FaultKind::MultipleValues).into())
}
// Already received proof.
Some(EchoContent::Full(proof)) if *proof == p => {
warn!(
"Node {:?} received Value({:?}) multiple times from {:?}.",
self.our_id(),
@ -209,24 +232,26 @@ impl<N: NodeIdT> Broadcast<N> {
sender_id
);
return Ok(Step::default());
} else {
return Ok(Fault::new(sender_id.clone(), FaultKind::MultipleValues).into());
}
}
_ => (),
};
// If the proof is invalid, log the faulty node behavior and ignore.
if !self.validate_proof(&p, &self.our_id()) {
return Ok(Fault::new(sender_id.clone(), FaultKind::InvalidProof).into());
}
// Otherwise multicast the proof in an `Echo` message, and handle it ourselves.
self.send_echo(p)
// Send the proof in an `Echo` message to left nodes
// and `EchoHash` message to right nodes and handle the response.
let echo_hash_steps = self.send_echo_hash(p.root_hash())?;
let echo_steps = self.send_echo_left(p)?;
Ok(echo_steps.join(echo_hash_steps))
}
/// Handles a received `Echo` message.
fn handle_echo(&mut self, sender_id: &N, p: Proof<Vec<u8>>) -> Result<Step<N>> {
// If the sender has already sent `Echo`, ignore.
if let Some(old_p) = self.echos.get(sender_id) {
if let Some(EchoContent::Full(old_p)) = self.echos.get(sender_id) {
if *old_p == p {
warn!(
"Node {:?} received Echo({:?}) multiple times from {:?}.",
@ -240,6 +265,14 @@ impl<N: NodeIdT> Broadcast<N> {
}
}
// Case where we have received an earlier `EchoHash`
// message from sender_id with different root_hash.
if let Some(EchoContent::Hash(hash)) = self.echos.get(sender_id) {
if hash != p.root_hash() {
return Ok(Fault::new(sender_id.clone(), FaultKind::MultipleEchos).into());
}
}
// If the proof is invalid, log the faulty-node behavior, and ignore.
if !self.validate_proof(&p, sender_id) {
return Ok(Fault::new(sender_id.clone(), FaultKind::InvalidProof).into());
@ -248,16 +281,85 @@ impl<N: NodeIdT> Broadcast<N> {
let hash = *p.root_hash();
// Save the proof for reconstructing the tree later.
self.echos.insert(sender_id.clone(), p);
self.echos.insert(sender_id.clone(), EchoContent::Full(p));
let mut step = Step::default();
// Upon receiving `N - 2f` `Echo`s with this root hash, send `CanDecode`
if !self.can_decode_sent.contains(&hash)
&& self.count_echos_full(&hash) >= self.coding.data_shard_count()
{
step.extend(self.send_can_decode(&hash)?);
}
// Upon receiving `N - f` `Echo`s with this root hash, multicast `Ready`.
if !self.ready_sent && self.count_echos(&hash) >= self.netinfo.num_correct() {
step.extend(self.send_ready(&hash)?);
}
// Computes output if we have required number of `Echo`s and `Ready`s
// Else returns Step::default()
if self.ready_sent {
step.extend(self.compute_output(&hash)?);
}
Ok(step)
}
fn handle_echo_hash(&mut self, sender_id: &N, hash: &Digest) -> Result<Step<N>> {
// If the sender has already sent `EchoHash`, ignore.
if let Some(EchoContent::Hash(old_hash)) = self.echos.get(sender_id) {
if old_hash == hash {
warn!(
"Node {:?} received EchoHash({:?}) multiple times from {:?}.",
self.our_id(),
hash,
sender_id,
);
return Ok(Step::default());
} else {
return Ok(Fault::new(sender_id.clone(), FaultKind::MultipleEchoHashes).into());
}
}
// If the sender has already sent an `Echo` for the same hash, ignore.
if let Some(EchoContent::Full(p)) = self.echos.get(sender_id) {
if p.root_hash() == hash {
return Ok(Step::default());
} else {
return Ok(Fault::new(sender_id.clone(), FaultKind::MultipleEchoHashes).into());
}
}
// Save the hash for counting later.
self.echos
.insert(sender_id.clone(), EchoContent::Hash(*hash));
if self.ready_sent || self.count_echos(&hash) < self.netinfo.num_correct() {
return self.compute_output(&hash);
}
// Upon receiving `N - f` `Echo`s with this root hash, multicast `Ready`.
self.send_ready(&hash)
}
/// Handles a received `CanDecode` message.
fn handle_can_decode(&mut self, sender_id: &N, hash: &Digest) -> Result<Step<N>> {
// Save the hash for counting later. If hash from sender_id already exists, emit a warning.
if let Some(nodes) = self.can_decodes.get(hash) {
if nodes.contains(sender_id) {
warn!(
"Node {:?} received same CanDecode({:?}) multiple times from {:?}.",
self.our_id(),
hash,
sender_id,
);
}
}
self.can_decodes
.entry(*hash)
.or_default()
.insert(sender_id.clone());
Ok(Step::default())
}
/// Handles a received `Ready` message.
fn handle_ready(&mut self, sender_id: &N, hash: &Digest) -> Result<Step<N>> {
// If the sender has already sent a `Ready` before, ignore.
@ -284,21 +386,128 @@ impl<N: NodeIdT> Broadcast<N> {
// Enqueue a broadcast of a Ready message.
step.extend(self.send_ready(hash)?);
}
// Upon receiving 2f + 1 matching Ready(h) messages, send full
// `Echo` message to every node who hasn't sent us a `CanDecode`
if self.count_readys(hash) == 2 * self.netinfo.num_faulty() + 1 {
step.extend(self.send_echo_remaining(hash)?);
}
Ok(step.join(self.compute_output(hash)?))
}
/// Sends an `Echo` message and handles it. Does nothing if we are only an observer.
fn send_echo(&mut self, p: Proof<Vec<u8>>) -> Result<Step<N>> {
self.echo_sent = true;
/// Sends `Echo` message to all left nodes and handles it.
fn send_echo_left(&mut self, p: Proof<Vec<u8>>) -> Result<Step<N>> {
if !self.netinfo.is_validator() {
return Ok(Step::default());
}
let echo_msg = Message::Echo(p.clone());
let step: Step<_> = Target::All.message(echo_msg).into();
let mut step = Step::default();
// `N - 2f + g` node ids to the left of our_id (excluding our_id)
// after arranging all node ids in a circular list.
let left = self
.netinfo
.all_ids()
.cycle()
.skip_while(|x| *x != self.our_id())
.take(self.netinfo.num_correct() - self.netinfo.num_faulty() + self.fault_estimate)
.skip(1);
for id in left {
let msg = Target::Node(id.clone()).message(echo_msg.clone());
step.messages.push(msg);
}
// Send `Echo` message to all non-validating nodes.
step.extend(
Target::AllExcept(self.netinfo.all_ids().cloned().collect::<BTreeSet<_>>())
.message(echo_msg)
.into(),
);
let our_id = &self.our_id().clone();
Ok(step.join(self.handle_echo(our_id, p)?))
}
/// Sends `Echo` message to remaining nodes who haven't sent `CanDecode`
fn send_echo_remaining(&mut self, hash: &Digest) -> Result<Step<N>> {
self.echo_sent = true;
if !self.netinfo.is_validator() {
return Ok(Step::default());
}
let p = match self.echos.get(self.our_id()) {
// Haven't received `Echo`.
None | Some(EchoContent::Hash(_)) => return Ok(Step::default()),
// Received `Echo` for different hash.
Some(EchoContent::Full(p)) if p.root_hash() != hash => return Ok(Step::default()),
Some(EchoContent::Full(p)) => p.clone(),
};
let echo_msg = Message::Echo(p);
let mut step = Step::default();
let senders = self.can_decodes.get(hash);
// Remaining node ids to the right of our_id
// after arranging all node ids in a circular list.
let right = self
.netinfo
.all_ids()
.cycle()
.skip_while(|x| *x != self.our_id())
.skip(self.netinfo.num_correct() - self.netinfo.num_faulty() + self.fault_estimate)
.take_while(|x| *x != self.our_id());
let msgs = right
.filter(|id| senders.map_or(true, |s| !s.contains(id)))
.map(|id| Target::Node(id.clone()).message(echo_msg.clone()));
step.messages.extend(msgs);
Ok(step)
}
/// Sends an `EchoHash` message and handles it. Does nothing if we are only an observer.
fn send_echo_hash(&mut self, hash: &Digest) -> Result<Step<N>> {
self.echo_hash_sent = true;
if !self.netinfo.is_validator() {
return Ok(Step::default());
}
let echo_hash_msg = Message::EchoHash(*hash);
let mut step = Step::default();
// Remaining node ids to the right of our_id
// after arranging all node ids in a circular list.
let right = self
.netinfo
.all_ids()
.cycle()
.skip_while(|x| *x != self.our_id())
.skip(self.netinfo.num_correct() - self.netinfo.num_faulty() + self.fault_estimate)
.take_while(|x| *x != self.our_id());
for id in right {
let msg = Target::Node(id.clone()).message(echo_hash_msg.clone());
step.messages.push(msg);
}
let our_id = &self.our_id().clone();
Ok(step.join(self.handle_echo_hash(our_id, hash)?))
}
/// Sends a `CanDecode` message and handles it. Does nothing if we are only an observer.
fn send_can_decode(&mut self, hash: &Digest) -> Result<Step<N>> {
self.can_decode_sent.insert(hash.clone());
if !self.netinfo.is_validator() {
return Ok(Step::default());
}
let can_decode_msg = Message::CanDecode(*hash);
let mut step = Step::default();
for id in self.netinfo.all_ids() {
match self.echos.get(id) {
Some(EchoContent::Hash(_)) | None => {
let msg = Target::Node(id.clone()).message(can_decode_msg.clone());
step.messages.push(msg);
}
_ => (),
}
}
let our_id = &self.our_id().clone();
Ok(step.join(self.handle_can_decode(our_id, hash)?))
}
/// Sends a `Ready` message and handles it. Does nothing if we are only an observer.
fn send_ready(&mut self, hash: &Digest) -> Result<Step<N>> {
self.ready_sent = true;
@ -316,7 +525,7 @@ impl<N: NodeIdT> Broadcast<N> {
fn compute_output(&mut self, hash: &Digest) -> Result<Step<N>> {
if self.decided
|| self.count_readys(hash) <= 2 * self.netinfo.num_faulty()
|| self.count_echos(hash) < self.coding.data_shard_count()
|| self.count_echos_full(hash) < self.coding.data_shard_count()
{
return Ok(Step::default());
}
@ -326,13 +535,16 @@ impl<N: NodeIdT> Broadcast<N> {
.netinfo
.all_ids()
.map(|id| {
self.echos.get(id).and_then(|p| {
if p.root_hash() == hash {
Some(p.value().clone().into_boxed_slice())
} else {
None
}
})
self.echos
.get(id)
.and_then(EchoContent::proof)
.and_then(|p| {
if p.root_hash() == hash {
Some(p.value().clone().into_boxed_slice())
} else {
None
}
})
})
.collect();
if let Some(value) = self.decode_from_shards(&mut leaf_values, hash) {
@ -392,14 +604,20 @@ impl<N: NodeIdT> Broadcast<N> {
self.netinfo.node_index(id) == Some(p.index()) && p.validate(self.netinfo.num_nodes())
}
/// Returns the number of nodes that have sent us an `Echo` message with this hash.
fn count_echos(&self, hash: &Digest) -> usize {
/// Returns the number of nodes that have sent us a full `Echo` message with this hash.
fn count_echos_full(&self, hash: &Digest) -> usize {
self.echos
.values()
.filter_map(EchoContent::proof)
.filter(|p| p.root_hash() == hash)
.count()
}
/// Returns the number of nodes that have sent us an `Echo` or `EchoHash` message with this hash.
fn count_echos(&self, hash: &Digest) -> usize {
self.echos.values().filter(|v| v.hash() == hash).count()
}
/// Returns the number of nodes that have sent us a `Ready` message with this hash.
fn count_readys(&self, hash: &Digest) -> usize {
self.readys
@ -473,3 +691,30 @@ impl Coding {
}
}
}
/// Content for `EchoHash` and `Echo` messages.
#[derive(Debug)]
enum EchoContent {
/// `EchoHash` message.
Hash(Digest),
/// `Echo` message
Full(Proof<Vec<u8>>),
}
impl EchoContent {
/// Returns hash of the message from either message types.
pub fn hash(&self) -> &Digest {
match &self {
EchoContent::Hash(h) => h,
EchoContent::Full(p) => p.root_hash(),
}
}
/// Returns Proof if type is Full else returns None.
pub fn proof(&self) -> Option<&Proof<Vec<u8>>> {
match &self {
EchoContent::Hash(_) => None,
EchoContent::Full(p) => Some(p),
}
}
}

View File

@ -35,6 +35,9 @@ pub enum FaultKind {
/// `Broadcast` received multiple different `Echo`s from the same sender.
#[fail(display = "`Broadcast` received multiple different `Echo`s from the same sender.")]
MultipleEchos,
/// `Broadcast` received multiple different `EchoHash`s from the same sender.
#[fail(display = "`Broadcast` received multiple different `EchoHash`s from the same sender.")]
MultipleEchoHashes,
/// `Broadcast` received multiple different `Ready`s from the same sender.
#[fail(display = "`Broadcast` received multiple different `Ready`s from the same sender.")]
MultipleReadys,

View File

@ -17,13 +17,19 @@ pub enum Message {
Echo(Proof<Vec<u8>>),
/// Indicates that the sender knows that every node will eventually be able to decode.
Ready(Digest),
/// Indicates that this node has enough shares to decode the message with given Merkle root.
CanDecode(Digest),
/// Indicates that sender can send an Echo for given Merkle root.
EchoHash(Digest),
}
// A random generation impl is provided for test cases. Unfortunately `#[cfg(test)]` does not work
// for integration tests.
impl Distribution<Message> for Standard {
fn sample<R: Rng + ?Sized>(&self, rng: &mut R) -> Message {
let message_type = *["value", "echo", "ready"].choose(rng).unwrap();
let message_type = *["value", "echo", "ready", "can_decode", "echo_hash"]
.choose(rng)
.unwrap();
// Create a random buffer for our proof.
let mut buffer: [u8; 32] = [0; 32];
@ -37,6 +43,8 @@ impl Distribution<Message> for Standard {
"value" => Message::Value(proof),
"echo" => Message::Echo(proof),
"ready" => Message::Ready([b'r'; 32]),
"can_decode" => Message::Ready([b'r'; 32]),
"echo_hash" => Message::Ready([b'r'; 32]),
_ => unreachable!(),
}
}
@ -48,6 +56,8 @@ impl Debug for Message {
Message::Value(ref v) => f.debug_tuple("Value").field(&HexProof(v)).finish(),
Message::Echo(ref v) => f.debug_tuple("Echo").field(&HexProof(v)).finish(),
Message::Ready(ref b) => write!(f, "Ready({:0.10})", HexFmt(b)),
Message::CanDecode(ref b) => write!(f, "CanDecode({:0.10})", HexFmt(b)),
Message::EchoHash(ref b) => write!(f, "EchoHash({:0.10})", HexFmt(b)),
}
}
}

View File

@ -30,7 +30,8 @@
//! `p[i] = (h, b[i], s[i])`, with which a third party can verify that `s[i]` is the `i`-th leaf of
//! the Merkle tree with root hash `h`.
//!
//! The algorithm proceeds as follows:
//!
//! The original algorithm proceeds as follows:
//! * The proposer sends `Value(p[i])` to each validator number `i`.
//! * When validator `i` receives `Value(p[i])` from the proposer, it sends it on to everyone else
//! as `Echo(p[i])`.
@ -39,12 +40,40 @@
//! * A node that has received _2 f + 1_ `Ready`s **and** _N - 2 f_ `Echo`s with root hash `h`
//! decodes and outputs the value, and then terminates.
//!
//! Only the first valid `Value` from the proposer, and the first valid `Echo` message from every
//! validator, is handled as above. Invalid messages (where the proof isn't correct), `Values`
//! received from other nodes, and any further `Value`s and `Echo`s are ignored, and the sender is
//! reported as faulty.
//! We use a modified version of the algorithm to save on bandwith which provides the same
//! security guarantees as the original. The main idea of the optimized algorithm is that in the
//! optimisitic case, a node only needs _N - 2 f_ chunks to decode a value and every additional
//! `Echo` message over that is wasteful.
//! The modified algorithm introduces two new message types:
//! * `CanDecode(h)` - Indicates node has enough chunks to recover message with merkle root `h`.
//! * `EchoHash(h)` - Indicates node can send an `Echo(p[i])` message upon request.
//!
//! In the `Valid(p[i])` messages, the proposer distributes the chunks of the value equally among
//! Let _g_ be the `fault_estimate` i.e. the estimate of number of faulty nodes in the network
//! that we want to optimize for.
//! Note that the algorithm is still correct when more than `g` nodes are faulty.
//!
//! Define the left nodes for any node `i` as the _N - 2 f + g_ nodes to the left side of `i` after
//! arranging all nodes in a circular list.
//!
//! With the new message types and definitions, the modified algorithm works as follows:
//! * The proposer sends `Value(p[i])` to each validator number `i`.
//! * Upon receiving `Value(p[i])` from the proposer, the validator `i` sends `Echo(p[i])` to all nodes
//! on its left, and `EchoHash(h)` to the remaining validators.
//! * A validator that has received _N - f_ `Echo`s plus `EchoHash`s **or** _f + 1_ `Ready`s with root hash `h`,
//! sends `Ready(h)` to everyone.
//! * A validator that has received _N - 2 f_ `Echo`s with root hash `h`, sends `CanDecode(h)` to all nodes
//! who haven't sent them a full `Echo` message.
//! * A validator that has received _2 f + 1_ `Ready`s with root hash `h` sends a full `Echo` message to the
//! remaining nodes who haven't sent them `CanDecode(h)`.
//! * A node that has received _2 f + 1_ `Ready`s **and** _N - 2 f_ `Echo`s with root hash `h`
//! decodes and outputs the value, and then terminates.
//!
//! Only the first valid `Value` from the proposer, and the first valid `Echo` and `EchoHash` message from every
//! validator, is handled as above. Invalid messages (where the proof isn't correct), `Values`
//! received from other nodes, and any further `Value`s, `Echo`s and `EchoHash`s are ignored, and the sender is
//! reported as faulty. A node may receive multiple `CanDecode`s with different root hash.
//!
//! In the `Value(p[i])` messages, the proposer distributes the chunks of the value equally among
//! all validators, along with a proof to verify that all chunks are leaves of the same Merkle tree
//! with root hash `h`.
//!
@ -53,11 +82,23 @@
//! value when the algorithm completes: Every node that receives at least _N - 2 f_ valid `Echo`s
//! with root hash `h` can decode the value.
//!
//! An `EchoHash(h)` indicates that the validator `i` has received its chunk of the value from the
//! proposer and can provide the full `Echo` message later upon request. Since a node requires only
//! _N - 2 f_ valid `Echo` messages to reconstruct the value, sending all the extra `Echo` messages
//! in the optimistic case is wasteful since the `Echo` message is considerably larger than the
//! constant sized `EchoHash` message.
//!
//! A `CanDecode(h)` indicates that validator `i` has enough chunks to reconstruct the value. This is
//! to indicate to the nodes that have sent it only an `EchoHash` that they need not send the full `Echo`
//! message. In the optimistic case, there need not be any additional `Echo` message. However, a delay
//! in receiving the `CanDecode` message or not enough chunks available to decode may lead to additional
//! `Echo` messages being sent.
//!
//! A validator sends `Ready(h)` as soon as it knows that everyone will eventually be able to
//! decode the value with root hash `h`. Either of the two conditions in the third point above is
//! sufficient for that:
//! * If it has received _N - f_ `Echo`s with `h`, it knows that at least _N - 2 f_ **correct**
//! validators have multicast an `Echo` with `h`, and therefore everyone will
//! * If it has received _N - f_ `Echo`s or `EchoHash`s with `h`, it knows that at least _N - 2 f_
//! **correct** validators have multicast an `Echo` or `EchoHash` with `h`, and therefore everyone will
//! eventually receive at least _N - 2 f_ valid ones. So it knows that everyone will be able to
//! decode, and can send `Ready(h)`.
//! Moreover, since every correct validator only sends one kind of `Echo` message, there is no
@ -78,6 +119,13 @@
//! _2 f + 1_ `Ready`s **and** _N - 2 f_ `Echo`s with root hash `h`), we know that
//! everyone else will eventually satisfy it, too. So at that point, we can output and terminate.
//!
//! In the improved algorithm, with _f = 0_ and _g = 0_, we can get almost 66% savings in bandwith.
//! With `f` faulty nodes and _g = f_, we can get almost 33% reduction in bandwith.
//! With `f` faulty nodes and _g = 2 f_, it is essentially the original algorithm plus the `CanDecode`
//! messages. Note that the bandwith savings numbers are only upper bounds and can happen only in an ideal
//! network where every node receives every `CanDecode` message before sending `Ready`. The practical
//! savings in bandwith are much smaller but still significant.
//!
//!
//! ## Example
//!
@ -168,6 +216,12 @@
//! on_step(*id, step, &mut messages, &mut finished_nodes);
//! }
//! }
//! Target::AllExcept(exclude) => {
//! for (id, node) in nodes.iter_mut().filter(|&(id, _)| !exclude.contains(id)) {
//! let step = node.handle_message(&source, message.clone())?;
//! on_step(*id, step, &mut messages, &mut finished_nodes);
//! }
//! }
//! Target::Node(id) => {
//! let step = {
//! let node = nodes.get_mut(&id).unwrap();

View File

@ -1,3 +1,5 @@
use std::collections::BTreeSet;
/// Message sent by a given source.
#[derive(Clone, Debug)]
pub struct SourcedMessage<M, N> {
@ -14,6 +16,10 @@ pub enum Target<N> {
All,
/// The message must be sent to the node with the given ID.
Node(N),
/// The message must be sent to all remote nodes except the passed nodes.
/// Useful for sending messages to observer nodes that aren't
/// present in a node's `all_ids()` list.
AllExcept(BTreeSet<N>),
}
impl<N> Target<N> {

View File

@ -82,7 +82,7 @@ impl<N: NodeIdT> NetworkInfo<N> {
/// ID of all nodes in the network.
#[inline]
pub fn all_ids(&self) -> impl Iterator<Item = &N> {
pub fn all_ids(&self) -> impl Iterator<Item = &N> + Clone {
self.public_keys.keys()
}

View File

@ -283,6 +283,30 @@ where
}
}
}
Target::AllExcept(exclude) => {
let is_accepted = |&them| msg.message.is_accepted(them, max_future_epochs);
let is_premature = |&them| msg.message.is_premature(them, max_future_epochs);
let is_obsolete = |&them| msg.message.is_obsolete(them);
let filtered_nodes: BTreeMap<_, _> = peer_epochs
.iter()
.filter(|(id, _)| !exclude.contains(id))
.map(|(k, v)| (k.clone(), *v))
.collect();
if filtered_nodes.values().all(is_accepted) {
passed_msgs.push(msg);
} else {
// The `Target::AllExcept` message is split into two sets of point messages: those
// which can be sent without delay and those which should be postponed.
for (id, them) in &filtered_nodes {
if is_premature(them) {
deferred_msgs.push((id.clone(), msg.message.clone()));
} else if !is_obsolete(them) {
passed_msgs
.push(Target::Node(id.clone()).message(msg.message.clone()));
}
}
}
}
}
}
self.messages.extend(passed_msgs);