diff --git a/examples/network/messaging.rs b/examples/network/messaging.rs index 985bf3c..80e332e 100644 --- a/examples/network/messaging.rs +++ b/examples/network/messaging.rs @@ -2,6 +2,7 @@ use crossbeam::thread::{Scope, ScopedJoinHandle}; use crossbeam_channel::{self, bounded, select, unbounded, Receiver, Sender}; use hbbft::{SourcedMessage, Target, TargetedMessage}; +use std::collections::BTreeSet; /// The queue functionality for messages sent between algorithm instances. /// The messaging struct allows for targeted message exchange between comms @@ -107,7 +108,7 @@ impl Messaging { select! { recv(rx_from_algo) -> tm => { if let Ok(tm) = tm { - match tm.target { + match &tm.target { Target::All => { // Send the message to all remote nodes, stopping at the first // error. @@ -120,9 +121,26 @@ impl Messaging { } }).map_err(Error::from); }, + Target::AllExcept(exclude) => { + // Send the message to all remote nodes not in `exclude`, stopping at the first + // error. + let filtered_txs: Vec<_> = (0..txs_to_comms.len()) + .collect::>() + .difference(exclude) + .cloned() + .collect(); + result = filtered_txs.iter() + .fold(Ok(()), |result, i| { + if result.is_ok() { + txs_to_comms[*i].send(tm.message.clone()) + } else { + result + } + }).map_err(Error::from); + }, Target::Node(i) => { - result = if i < txs_to_comms.len() { - txs_to_comms[i].send(tm.message) + result = if *i < txs_to_comms.len() { + txs_to_comms[*i].send(tm.message) .map_err(Error::from) } else { Err(Error::NoSuchTarget) diff --git a/examples/simulation.rs b/examples/simulation.rs index 78eefcb..909206c 100644 --- a/examples/simulation.rs +++ b/examples/simulation.rs @@ -273,7 +273,7 @@ where Q: IntoIterator>, { for ts_msg in msgs { - match ts_msg.target { + match &ts_msg.target { Target::All => { for node in self.nodes.values_mut() { if node.id != ts_msg.sender_id { @@ -281,6 +281,13 @@ where } } } + Target::AllExcept(exclude) => { + for node in self.nodes.values_mut().filter(|n| !exclude.contains(&n.id)) { + if node.id != ts_msg.sender_id { + node.add_message(ts_msg.clone()) + } + } + } Target::Node(to_id) => { if let Some(node) = self.nodes.get_mut(&to_id) { node.add_message(ts_msg); diff --git a/hbbft_testing/src/lib.rs b/hbbft_testing/src/lib.rs index 6fe4cc2..7c57cee 100644 --- a/hbbft_testing/src/lib.rs +++ b/hbbft_testing/src/lib.rs @@ -257,6 +257,22 @@ where message_count = message_count.saturating_add(1); } + dest.push_back(NetworkMessage::new( + stepped_id.clone(), + tmsg.message.clone(), + to.clone(), + )); + } + } + hbbft::Target::AllExcept(exclude) => { + for to in nodes + .keys() + .filter(|&to| to != &stepped_id && !exclude.contains(to)) + { + if !faulty { + message_count = message_count.saturating_add(1); + } + dest.push_back(NetworkMessage::new( stepped_id.clone(), tmsg.message.clone(), diff --git a/src/broadcast/broadcast.rs b/src/broadcast/broadcast.rs index e57d250..e241c93 100644 --- a/src/broadcast/broadcast.rs +++ b/src/broadcast/broadcast.rs @@ -1,4 +1,5 @@ use std::collections::BTreeMap; +use std::collections::BTreeSet; use std::sync::Arc; use std::{fmt, result}; @@ -28,14 +29,23 @@ pub struct Broadcast { coding: Coding, /// If we are the proposer: whether we have already sent the `Value` messages with the shards. value_sent: bool, - /// Whether we have already multicast `Echo`. + /// Whether we have already sent `Echo` to all nodes who haven't sent `CanDecode`. echo_sent: bool, /// Whether we have already multicast `Ready`. ready_sent: bool, + /// Whether we have already sent `EchoHash` to the right nodes. + echo_hash_sent: bool, + /// Whether we have already sent `CanDecode` for the given hash. + can_decode_sent: BTreeSet, /// Whether we have already output a value. decided: bool, - /// The proofs we have received via `Echo` messages, by sender ID. - echos: BTreeMap>>, + /// Number of faulty nodes to optimize performance for. + fault_estimate: usize, + /// The hashes and proofs we have received via `Echo` and `EchoHash` messages, by sender ID. + echos: BTreeMap, + /// The hashes we have received from nodes via `CanDecode` messages, by hash. + /// A node can receive conflicting `CanDecode`s from the same node. + can_decodes: BTreeMap>, /// The root hashes we received via `Ready` messages, by sender ID. readys: BTreeMap>, } @@ -81,6 +91,7 @@ impl Broadcast { let data_shard_num = netinfo.num_nodes() - parity_shard_num; let coding = Coding::new(data_shard_num, parity_shard_num).map_err(|_| Error::InvalidNodeCount)?; + let fault_estimate = netinfo.num_faulty(); Ok(Broadcast { netinfo, @@ -89,8 +100,12 @@ impl Broadcast { value_sent: false, echo_sent: false, ready_sent: false, + echo_hash_sent: false, + can_decode_sent: BTreeSet::new(), decided: false, + fault_estimate, echos: BTreeMap::new(), + can_decodes: BTreeMap::new(), readys: BTreeMap::new(), }) } @@ -123,6 +138,8 @@ impl Broadcast { Message::Value(p) => self.handle_value(sender_id, p), Message::Echo(p) => self.handle_echo(sender_id, p), Message::Ready(ref hash) => self.handle_ready(sender_id, hash), + Message::CanDecode(ref hash) => self.handle_can_decode(sender_id, hash), + Message::EchoHash(ref hash) => self.handle_echo_hash(sender_id, hash), } } @@ -200,8 +217,14 @@ impl Broadcast { let fault_kind = FaultKind::ReceivedValueFromNonProposer; return Ok(Fault::new(sender_id.clone(), fault_kind).into()); } - if self.echo_sent { - if self.echos.get(self.our_id()) == Some(&p) { + + match self.echos.get(self.our_id()) { + // Multiple values from proposer. + Some(val) if val.hash() != p.root_hash() => { + return Ok(Fault::new(sender_id.clone(), FaultKind::MultipleValues).into()) + } + // Already received proof. + Some(EchoContent::Full(proof)) if *proof == p => { warn!( "Node {:?} received Value({:?}) multiple times from {:?}.", self.our_id(), @@ -209,24 +232,26 @@ impl Broadcast { sender_id ); return Ok(Step::default()); - } else { - return Ok(Fault::new(sender_id.clone(), FaultKind::MultipleValues).into()); } - } + _ => (), + }; // If the proof is invalid, log the faulty node behavior and ignore. if !self.validate_proof(&p, &self.our_id()) { return Ok(Fault::new(sender_id.clone(), FaultKind::InvalidProof).into()); } - // Otherwise multicast the proof in an `Echo` message, and handle it ourselves. - self.send_echo(p) + // Send the proof in an `Echo` message to left nodes + // and `EchoHash` message to right nodes and handle the response. + let echo_hash_steps = self.send_echo_hash(p.root_hash())?; + let echo_steps = self.send_echo_left(p)?; + Ok(echo_steps.join(echo_hash_steps)) } /// Handles a received `Echo` message. fn handle_echo(&mut self, sender_id: &N, p: Proof>) -> Result> { // If the sender has already sent `Echo`, ignore. - if let Some(old_p) = self.echos.get(sender_id) { + if let Some(EchoContent::Full(old_p)) = self.echos.get(sender_id) { if *old_p == p { warn!( "Node {:?} received Echo({:?}) multiple times from {:?}.", @@ -240,6 +265,14 @@ impl Broadcast { } } + // Case where we have received an earlier `EchoHash` + // message from sender_id with different root_hash. + if let Some(EchoContent::Hash(hash)) = self.echos.get(sender_id) { + if hash != p.root_hash() { + return Ok(Fault::new(sender_id.clone(), FaultKind::MultipleEchos).into()); + } + } + // If the proof is invalid, log the faulty-node behavior, and ignore. if !self.validate_proof(&p, sender_id) { return Ok(Fault::new(sender_id.clone(), FaultKind::InvalidProof).into()); @@ -248,16 +281,85 @@ impl Broadcast { let hash = *p.root_hash(); // Save the proof for reconstructing the tree later. - self.echos.insert(sender_id.clone(), p); + self.echos.insert(sender_id.clone(), EchoContent::Full(p)); + + let mut step = Step::default(); + + // Upon receiving `N - 2f` `Echo`s with this root hash, send `CanDecode` + if !self.can_decode_sent.contains(&hash) + && self.count_echos_full(&hash) >= self.coding.data_shard_count() + { + step.extend(self.send_can_decode(&hash)?); + } + + // Upon receiving `N - f` `Echo`s with this root hash, multicast `Ready`. + if !self.ready_sent && self.count_echos(&hash) >= self.netinfo.num_correct() { + step.extend(self.send_ready(&hash)?); + } + + // Computes output if we have required number of `Echo`s and `Ready`s + // Else returns Step::default() + if self.ready_sent { + step.extend(self.compute_output(&hash)?); + } + Ok(step) + } + + fn handle_echo_hash(&mut self, sender_id: &N, hash: &Digest) -> Result> { + // If the sender has already sent `EchoHash`, ignore. + if let Some(EchoContent::Hash(old_hash)) = self.echos.get(sender_id) { + if old_hash == hash { + warn!( + "Node {:?} received EchoHash({:?}) multiple times from {:?}.", + self.our_id(), + hash, + sender_id, + ); + return Ok(Step::default()); + } else { + return Ok(Fault::new(sender_id.clone(), FaultKind::MultipleEchoHashes).into()); + } + } + + // If the sender has already sent an `Echo` for the same hash, ignore. + if let Some(EchoContent::Full(p)) = self.echos.get(sender_id) { + if p.root_hash() == hash { + return Ok(Step::default()); + } else { + return Ok(Fault::new(sender_id.clone(), FaultKind::MultipleEchoHashes).into()); + } + } + // Save the hash for counting later. + self.echos + .insert(sender_id.clone(), EchoContent::Hash(*hash)); if self.ready_sent || self.count_echos(&hash) < self.netinfo.num_correct() { return self.compute_output(&hash); } - // Upon receiving `N - f` `Echo`s with this root hash, multicast `Ready`. self.send_ready(&hash) } + /// Handles a received `CanDecode` message. + fn handle_can_decode(&mut self, sender_id: &N, hash: &Digest) -> Result> { + // Save the hash for counting later. If hash from sender_id already exists, emit a warning. + if let Some(nodes) = self.can_decodes.get(hash) { + if nodes.contains(sender_id) { + warn!( + "Node {:?} received same CanDecode({:?}) multiple times from {:?}.", + self.our_id(), + hash, + sender_id, + ); + } + } + self.can_decodes + .entry(*hash) + .or_default() + .insert(sender_id.clone()); + Ok(Step::default()) + } + /// Handles a received `Ready` message. fn handle_ready(&mut self, sender_id: &N, hash: &Digest) -> Result> { // If the sender has already sent a `Ready` before, ignore. @@ -284,21 +386,128 @@ impl Broadcast { // Enqueue a broadcast of a Ready message. step.extend(self.send_ready(hash)?); } + // Upon receiving 2f + 1 matching Ready(h) messages, send full + // `Echo` message to every node who hasn't sent us a `CanDecode` + if self.count_readys(hash) == 2 * self.netinfo.num_faulty() + 1 { + step.extend(self.send_echo_remaining(hash)?); + } + Ok(step.join(self.compute_output(hash)?)) } - /// Sends an `Echo` message and handles it. Does nothing if we are only an observer. - fn send_echo(&mut self, p: Proof>) -> Result> { - self.echo_sent = true; + /// Sends `Echo` message to all left nodes and handles it. + fn send_echo_left(&mut self, p: Proof>) -> Result> { if !self.netinfo.is_validator() { return Ok(Step::default()); } let echo_msg = Message::Echo(p.clone()); - let step: Step<_> = Target::All.message(echo_msg).into(); + let mut step = Step::default(); + // `N - 2f + g` node ids to the left of our_id (excluding our_id) + // after arranging all node ids in a circular list. + let left = self + .netinfo + .all_ids() + .cycle() + .skip_while(|x| *x != self.our_id()) + .take(self.netinfo.num_correct() - self.netinfo.num_faulty() + self.fault_estimate) + .skip(1); + for id in left { + let msg = Target::Node(id.clone()).message(echo_msg.clone()); + step.messages.push(msg); + } + // Send `Echo` message to all non-validating nodes. + step.extend( + Target::AllExcept(self.netinfo.all_ids().cloned().collect::>()) + .message(echo_msg) + .into(), + ); let our_id = &self.our_id().clone(); Ok(step.join(self.handle_echo(our_id, p)?)) } + /// Sends `Echo` message to remaining nodes who haven't sent `CanDecode` + fn send_echo_remaining(&mut self, hash: &Digest) -> Result> { + self.echo_sent = true; + if !self.netinfo.is_validator() { + return Ok(Step::default()); + } + + let p = match self.echos.get(self.our_id()) { + // Haven't received `Echo`. + None | Some(EchoContent::Hash(_)) => return Ok(Step::default()), + // Received `Echo` for different hash. + Some(EchoContent::Full(p)) if p.root_hash() != hash => return Ok(Step::default()), + Some(EchoContent::Full(p)) => p.clone(), + }; + + let echo_msg = Message::Echo(p); + let mut step = Step::default(); + + let senders = self.can_decodes.get(hash); + // Remaining node ids to the right of our_id + // after arranging all node ids in a circular list. + let right = self + .netinfo + .all_ids() + .cycle() + .skip_while(|x| *x != self.our_id()) + .skip(self.netinfo.num_correct() - self.netinfo.num_faulty() + self.fault_estimate) + .take_while(|x| *x != self.our_id()); + let msgs = right + .filter(|id| senders.map_or(true, |s| !s.contains(id))) + .map(|id| Target::Node(id.clone()).message(echo_msg.clone())); + step.messages.extend(msgs); + Ok(step) + } + + /// Sends an `EchoHash` message and handles it. Does nothing if we are only an observer. + fn send_echo_hash(&mut self, hash: &Digest) -> Result> { + self.echo_hash_sent = true; + if !self.netinfo.is_validator() { + return Ok(Step::default()); + } + let echo_hash_msg = Message::EchoHash(*hash); + let mut step = Step::default(); + // Remaining node ids to the right of our_id + // after arranging all node ids in a circular list. + let right = self + .netinfo + .all_ids() + .cycle() + .skip_while(|x| *x != self.our_id()) + .skip(self.netinfo.num_correct() - self.netinfo.num_faulty() + self.fault_estimate) + .take_while(|x| *x != self.our_id()); + for id in right { + let msg = Target::Node(id.clone()).message(echo_hash_msg.clone()); + step.messages.push(msg); + } + let our_id = &self.our_id().clone(); + Ok(step.join(self.handle_echo_hash(our_id, hash)?)) + } + + /// Sends a `CanDecode` message and handles it. Does nothing if we are only an observer. + fn send_can_decode(&mut self, hash: &Digest) -> Result> { + self.can_decode_sent.insert(hash.clone()); + if !self.netinfo.is_validator() { + return Ok(Step::default()); + } + + let can_decode_msg = Message::CanDecode(*hash); + let mut step = Step::default(); + + for id in self.netinfo.all_ids() { + match self.echos.get(id) { + Some(EchoContent::Hash(_)) | None => { + let msg = Target::Node(id.clone()).message(can_decode_msg.clone()); + step.messages.push(msg); + } + _ => (), + } + } + let our_id = &self.our_id().clone(); + Ok(step.join(self.handle_can_decode(our_id, hash)?)) + } + /// Sends a `Ready` message and handles it. Does nothing if we are only an observer. fn send_ready(&mut self, hash: &Digest) -> Result> { self.ready_sent = true; @@ -316,7 +525,7 @@ impl Broadcast { fn compute_output(&mut self, hash: &Digest) -> Result> { if self.decided || self.count_readys(hash) <= 2 * self.netinfo.num_faulty() - || self.count_echos(hash) < self.coding.data_shard_count() + || self.count_echos_full(hash) < self.coding.data_shard_count() { return Ok(Step::default()); } @@ -326,13 +535,16 @@ impl Broadcast { .netinfo .all_ids() .map(|id| { - self.echos.get(id).and_then(|p| { - if p.root_hash() == hash { - Some(p.value().clone().into_boxed_slice()) - } else { - None - } - }) + self.echos + .get(id) + .and_then(EchoContent::proof) + .and_then(|p| { + if p.root_hash() == hash { + Some(p.value().clone().into_boxed_slice()) + } else { + None + } + }) }) .collect(); if let Some(value) = self.decode_from_shards(&mut leaf_values, hash) { @@ -392,14 +604,20 @@ impl Broadcast { self.netinfo.node_index(id) == Some(p.index()) && p.validate(self.netinfo.num_nodes()) } - /// Returns the number of nodes that have sent us an `Echo` message with this hash. - fn count_echos(&self, hash: &Digest) -> usize { + /// Returns the number of nodes that have sent us a full `Echo` message with this hash. + fn count_echos_full(&self, hash: &Digest) -> usize { self.echos .values() + .filter_map(EchoContent::proof) .filter(|p| p.root_hash() == hash) .count() } + /// Returns the number of nodes that have sent us an `Echo` or `EchoHash` message with this hash. + fn count_echos(&self, hash: &Digest) -> usize { + self.echos.values().filter(|v| v.hash() == hash).count() + } + /// Returns the number of nodes that have sent us a `Ready` message with this hash. fn count_readys(&self, hash: &Digest) -> usize { self.readys @@ -473,3 +691,30 @@ impl Coding { } } } + +/// Content for `EchoHash` and `Echo` messages. +#[derive(Debug)] +enum EchoContent { + /// `EchoHash` message. + Hash(Digest), + /// `Echo` message + Full(Proof>), +} + +impl EchoContent { + /// Returns hash of the message from either message types. + pub fn hash(&self) -> &Digest { + match &self { + EchoContent::Hash(h) => h, + EchoContent::Full(p) => p.root_hash(), + } + } + + /// Returns Proof if type is Full else returns None. + pub fn proof(&self) -> Option<&Proof>> { + match &self { + EchoContent::Hash(_) => None, + EchoContent::Full(p) => Some(p), + } + } +} diff --git a/src/broadcast/error.rs b/src/broadcast/error.rs index 47d33b3..bd0bf4c 100644 --- a/src/broadcast/error.rs +++ b/src/broadcast/error.rs @@ -35,6 +35,9 @@ pub enum FaultKind { /// `Broadcast` received multiple different `Echo`s from the same sender. #[fail(display = "`Broadcast` received multiple different `Echo`s from the same sender.")] MultipleEchos, + /// `Broadcast` received multiple different `EchoHash`s from the same sender. + #[fail(display = "`Broadcast` received multiple different `EchoHash`s from the same sender.")] + MultipleEchoHashes, /// `Broadcast` received multiple different `Ready`s from the same sender. #[fail(display = "`Broadcast` received multiple different `Ready`s from the same sender.")] MultipleReadys, diff --git a/src/broadcast/message.rs b/src/broadcast/message.rs index 808b736..9da9877 100644 --- a/src/broadcast/message.rs +++ b/src/broadcast/message.rs @@ -17,13 +17,19 @@ pub enum Message { Echo(Proof>), /// Indicates that the sender knows that every node will eventually be able to decode. Ready(Digest), + /// Indicates that this node has enough shares to decode the message with given Merkle root. + CanDecode(Digest), + /// Indicates that sender can send an Echo for given Merkle root. + EchoHash(Digest), } // A random generation impl is provided for test cases. Unfortunately `#[cfg(test)]` does not work // for integration tests. impl Distribution for Standard { fn sample(&self, rng: &mut R) -> Message { - let message_type = *["value", "echo", "ready"].choose(rng).unwrap(); + let message_type = *["value", "echo", "ready", "can_decode", "echo_hash"] + .choose(rng) + .unwrap(); // Create a random buffer for our proof. let mut buffer: [u8; 32] = [0; 32]; @@ -37,6 +43,8 @@ impl Distribution for Standard { "value" => Message::Value(proof), "echo" => Message::Echo(proof), "ready" => Message::Ready([b'r'; 32]), + "can_decode" => Message::Ready([b'r'; 32]), + "echo_hash" => Message::Ready([b'r'; 32]), _ => unreachable!(), } } @@ -48,6 +56,8 @@ impl Debug for Message { Message::Value(ref v) => f.debug_tuple("Value").field(&HexProof(v)).finish(), Message::Echo(ref v) => f.debug_tuple("Echo").field(&HexProof(v)).finish(), Message::Ready(ref b) => write!(f, "Ready({:0.10})", HexFmt(b)), + Message::CanDecode(ref b) => write!(f, "CanDecode({:0.10})", HexFmt(b)), + Message::EchoHash(ref b) => write!(f, "EchoHash({:0.10})", HexFmt(b)), } } } diff --git a/src/broadcast/mod.rs b/src/broadcast/mod.rs index 3504dd4..2a93ace 100644 --- a/src/broadcast/mod.rs +++ b/src/broadcast/mod.rs @@ -30,7 +30,8 @@ //! `p[i] = (h, b[i], s[i])`, with which a third party can verify that `s[i]` is the `i`-th leaf of //! the Merkle tree with root hash `h`. //! -//! The algorithm proceeds as follows: +//! +//! The original algorithm proceeds as follows: //! * The proposer sends `Value(p[i])` to each validator number `i`. //! * When validator `i` receives `Value(p[i])` from the proposer, it sends it on to everyone else //! as `Echo(p[i])`. @@ -39,12 +40,40 @@ //! * A node that has received _2 f + 1_ `Ready`s **and** _N - 2 f_ `Echo`s with root hash `h` //! decodes and outputs the value, and then terminates. //! -//! Only the first valid `Value` from the proposer, and the first valid `Echo` message from every -//! validator, is handled as above. Invalid messages (where the proof isn't correct), `Values` -//! received from other nodes, and any further `Value`s and `Echo`s are ignored, and the sender is -//! reported as faulty. +//! We use a modified version of the algorithm to save on bandwith which provides the same +//! security guarantees as the original. The main idea of the optimized algorithm is that in the +//! optimisitic case, a node only needs _N - 2 f_ chunks to decode a value and every additional +//! `Echo` message over that is wasteful. +//! The modified algorithm introduces two new message types: +//! * `CanDecode(h)` - Indicates node has enough chunks to recover message with merkle root `h`. +//! * `EchoHash(h)` - Indicates node can send an `Echo(p[i])` message upon request. //! -//! In the `Valid(p[i])` messages, the proposer distributes the chunks of the value equally among +//! Let _g_ be the `fault_estimate` i.e. the estimate of number of faulty nodes in the network +//! that we want to optimize for. +//! Note that the algorithm is still correct when more than `g` nodes are faulty. +//! +//! Define the left nodes for any node `i` as the _N - 2 f + g_ nodes to the left side of `i` after +//! arranging all nodes in a circular list. +//! +//! With the new message types and definitions, the modified algorithm works as follows: +//! * The proposer sends `Value(p[i])` to each validator number `i`. +//! * Upon receiving `Value(p[i])` from the proposer, the validator `i` sends `Echo(p[i])` to all nodes +//! on its left, and `EchoHash(h)` to the remaining validators. +//! * A validator that has received _N - f_ `Echo`s plus `EchoHash`s **or** _f + 1_ `Ready`s with root hash `h`, +//! sends `Ready(h)` to everyone. +//! * A validator that has received _N - 2 f_ `Echo`s with root hash `h`, sends `CanDecode(h)` to all nodes +//! who haven't sent them a full `Echo` message. +//! * A validator that has received _2 f + 1_ `Ready`s with root hash `h` sends a full `Echo` message to the +//! remaining nodes who haven't sent them `CanDecode(h)`. +//! * A node that has received _2 f + 1_ `Ready`s **and** _N - 2 f_ `Echo`s with root hash `h` +//! decodes and outputs the value, and then terminates. +//! +//! Only the first valid `Value` from the proposer, and the first valid `Echo` and `EchoHash` message from every +//! validator, is handled as above. Invalid messages (where the proof isn't correct), `Values` +//! received from other nodes, and any further `Value`s, `Echo`s and `EchoHash`s are ignored, and the sender is +//! reported as faulty. A node may receive multiple `CanDecode`s with different root hash. +//! +//! In the `Value(p[i])` messages, the proposer distributes the chunks of the value equally among //! all validators, along with a proof to verify that all chunks are leaves of the same Merkle tree //! with root hash `h`. //! @@ -53,11 +82,23 @@ //! value when the algorithm completes: Every node that receives at least _N - 2 f_ valid `Echo`s //! with root hash `h` can decode the value. //! +//! An `EchoHash(h)` indicates that the validator `i` has received its chunk of the value from the +//! proposer and can provide the full `Echo` message later upon request. Since a node requires only +//! _N - 2 f_ valid `Echo` messages to reconstruct the value, sending all the extra `Echo` messages +//! in the optimistic case is wasteful since the `Echo` message is considerably larger than the +//! constant sized `EchoHash` message. +//! +//! A `CanDecode(h)` indicates that validator `i` has enough chunks to reconstruct the value. This is +//! to indicate to the nodes that have sent it only an `EchoHash` that they need not send the full `Echo` +//! message. In the optimistic case, there need not be any additional `Echo` message. However, a delay +//! in receiving the `CanDecode` message or not enough chunks available to decode may lead to additional +//! `Echo` messages being sent. +//! //! A validator sends `Ready(h)` as soon as it knows that everyone will eventually be able to //! decode the value with root hash `h`. Either of the two conditions in the third point above is //! sufficient for that: -//! * If it has received _N - f_ `Echo`s with `h`, it knows that at least _N - 2 f_ **correct** -//! validators have multicast an `Echo` with `h`, and therefore everyone will +//! * If it has received _N - f_ `Echo`s or `EchoHash`s with `h`, it knows that at least _N - 2 f_ +//! **correct** validators have multicast an `Echo` or `EchoHash` with `h`, and therefore everyone will //! eventually receive at least _N - 2 f_ valid ones. So it knows that everyone will be able to //! decode, and can send `Ready(h)`. //! Moreover, since every correct validator only sends one kind of `Echo` message, there is no @@ -78,6 +119,13 @@ //! _2 f + 1_ `Ready`s **and** _N - 2 f_ `Echo`s with root hash `h`), we know that //! everyone else will eventually satisfy it, too. So at that point, we can output and terminate. //! +//! In the improved algorithm, with _f = 0_ and _g = 0_, we can get almost 66% savings in bandwith. +//! With `f` faulty nodes and _g = f_, we can get almost 33% reduction in bandwith. +//! With `f` faulty nodes and _g = 2 f_, it is essentially the original algorithm plus the `CanDecode` +//! messages. Note that the bandwith savings numbers are only upper bounds and can happen only in an ideal +//! network where every node receives every `CanDecode` message before sending `Ready`. The practical +//! savings in bandwith are much smaller but still significant. +//! //! //! ## Example //! @@ -168,6 +216,12 @@ //! on_step(*id, step, &mut messages, &mut finished_nodes); //! } //! } +//! Target::AllExcept(exclude) => { +//! for (id, node) in nodes.iter_mut().filter(|&(id, _)| !exclude.contains(id)) { +//! let step = node.handle_message(&source, message.clone())?; +//! on_step(*id, step, &mut messages, &mut finished_nodes); +//! } +//! } //! Target::Node(id) => { //! let step = { //! let node = nodes.get_mut(&id).unwrap(); diff --git a/src/messaging.rs b/src/messaging.rs index 87fb3d8..24d4a57 100644 --- a/src/messaging.rs +++ b/src/messaging.rs @@ -1,3 +1,5 @@ +use std::collections::BTreeSet; + /// Message sent by a given source. #[derive(Clone, Debug)] pub struct SourcedMessage { @@ -14,6 +16,10 @@ pub enum Target { All, /// The message must be sent to the node with the given ID. Node(N), + /// The message must be sent to all remote nodes except the passed nodes. + /// Useful for sending messages to observer nodes that aren't + /// present in a node's `all_ids()` list. + AllExcept(BTreeSet), } impl Target { diff --git a/src/network_info.rs b/src/network_info.rs index 81ccde1..2c1a8fb 100644 --- a/src/network_info.rs +++ b/src/network_info.rs @@ -82,7 +82,7 @@ impl NetworkInfo { /// ID of all nodes in the network. #[inline] - pub fn all_ids(&self) -> impl Iterator { + pub fn all_ids(&self) -> impl Iterator + Clone { self.public_keys.keys() } diff --git a/src/traits.rs b/src/traits.rs index b434fb9..529cbc6 100644 --- a/src/traits.rs +++ b/src/traits.rs @@ -283,6 +283,30 @@ where } } } + Target::AllExcept(exclude) => { + let is_accepted = |&them| msg.message.is_accepted(them, max_future_epochs); + let is_premature = |&them| msg.message.is_premature(them, max_future_epochs); + let is_obsolete = |&them| msg.message.is_obsolete(them); + let filtered_nodes: BTreeMap<_, _> = peer_epochs + .iter() + .filter(|(id, _)| !exclude.contains(id)) + .map(|(k, v)| (k.clone(), *v)) + .collect(); + if filtered_nodes.values().all(is_accepted) { + passed_msgs.push(msg); + } else { + // The `Target::AllExcept` message is split into two sets of point messages: those + // which can be sent without delay and those which should be postponed. + for (id, them) in &filtered_nodes { + if is_premature(them) { + deferred_msgs.push((id.clone(), msg.message.clone())); + } else if !is_obsolete(them) { + passed_msgs + .push(Target::Node(id.clone()).message(msg.message.clone())); + } + } + } + } } } self.messages.extend(passed_msgs);