Change the `Target` variants.

`Target` now only has a `Nodes` and an `AllExcept` variant, to specify
a message's target via a whitelist or blacklist. This avoids cloning
the message content and simplifies the code in several places.
This commit is contained in:
Andreas Fackler 2019-06-18 15:04:58 +02:00 committed by Andreas Fackler
parent 7078387115
commit 0e51bb3615
15 changed files with 114 additions and 221 deletions

View File

@ -2,7 +2,6 @@
use crossbeam::thread::{Scope, ScopedJoinHandle};
use crossbeam_channel::{self, bounded, select, unbounded, Receiver, Sender};
use hbbft::{SourcedMessage, Target, TargetedMessage};
use std::collections::BTreeSet;
/// The queue functionality for messages sent between algorithm instances.
/// The messaging struct allows for targeted message exchange between comms
@ -108,43 +107,15 @@ impl<M: Send> Messaging<M> {
select! {
recv(rx_from_algo) -> tm => {
if let Ok(tm) = tm {
match &tm.target {
Target::All => {
// Send the message to all remote nodes, stopping at the first
// error.
result = txs_to_comms.iter()
.fold(Ok(()), |result, tx| {
if result.is_ok() {
tx.send(tm.message.clone())
} else {
result
}
}).map_err(Error::from);
},
Target::AllExcept(exclude) => {
// Send the message to all remote nodes not in `exclude`, stopping at the first
// error.
let filtered_txs: Vec<_> = (0..txs_to_comms.len())
.collect::<BTreeSet<_>>()
.difference(exclude)
.cloned()
.collect();
result = filtered_txs.iter()
.fold(Ok(()), |result, i| {
if result.is_ok() {
txs_to_comms[*i].send(tm.message.clone())
} else {
result
}
}).map_err(Error::from);
},
Target::Node(i) => {
result = if *i < txs_to_comms.len() {
txs_to_comms[*i].send(tm.message)
.map_err(Error::from)
} else {
Err(Error::NoSuchTarget)
};
if match tm.target {
Target::AllExcept(ref ids) => ids,
Target::Nodes(ref ids) => ids,
}.iter().any(|i| *i >= txs_to_comms.len()) {
return Err(Error::NoSuchTarget);
}
for (i, tx) in txs_to_comms.iter().enumerate() {
if tm.target.contains(&i) {
tx.send(tm.message.clone())?;
}
}
}

View File

@ -273,25 +273,9 @@ where
Q: IntoIterator<Item = TimestampedMessage<D>>,
{
for ts_msg in msgs {
match &ts_msg.target {
Target::All => {
for node in self.nodes.values_mut() {
if node.id != ts_msg.sender_id {
node.add_message(ts_msg.clone())
}
}
}
Target::AllExcept(exclude) => {
for node in self.nodes.values_mut().filter(|n| !exclude.contains(&n.id)) {
if node.id != ts_msg.sender_id {
node.add_message(ts_msg.clone())
}
}
}
Target::Node(to_id) => {
if let Some(node) = self.nodes.get_mut(&to_id) {
node.add_message(ts_msg);
}
for node in self.nodes.values_mut() {
if ts_msg.target.contains(&node.id) && node.id != ts_msg.sender_id {
node.add_message(ts_msg.clone())
}
}
}

View File

@ -237,9 +237,8 @@ where
// Queue all messages for processing.
for tmsg in &step.messages {
match &tmsg.target {
// Single target message.
hbbft::Target::Node(to) => {
for to in nodes.keys() {
if tmsg.target.contains(to) && to != &stepped_id {
if !faulty {
message_count = message_count.saturating_add(1);
}
@ -250,36 +249,6 @@ where
to.clone(),
));
}
// Broadcast messages get expanded into multiple direct messages.
hbbft::Target::All => {
for to in nodes.keys().filter(|&to| to != &stepped_id) {
if !faulty {
message_count = message_count.saturating_add(1);
}
dest.push_back(NetworkMessage::new(
stepped_id.clone(),
tmsg.message.clone(),
to.clone(),
));
}
}
hbbft::Target::AllExcept(exclude) => {
for to in nodes
.keys()
.filter(|&to| to != &stepped_id && !exclude.contains(to))
{
if !faulty {
message_count = message_count.saturating_add(1);
}
dest.push_back(NetworkMessage::new(
stepped_id.clone(),
tmsg.message.clone(),
to.clone(),
));
}
}
}
}

View File

@ -386,7 +386,7 @@ impl<N: NodeIdT, S: SessionIdT> BinaryAgreement<N, S> {
if !self.netinfo.is_validator() {
return Ok(Step::default());
}
let step: Step<N> = Target::All
let step: Step<N> = Target::all()
.message(content.clone().with_epoch(self.epoch))
.into();
let our_id = &self.our_id().clone();
@ -462,7 +462,7 @@ impl<N: NodeIdT, S: SessionIdT> BinaryAgreement<N, S> {
debug!("{}: decision: {}", self, b);
if self.netinfo.is_validator() {
let msg = MessageContent::Term(b).with_epoch(self.epoch + 1);
step.messages.push(Target::All.message(msg));
step.messages.push(Target::all().message(msg));
}
step
}

View File

@ -141,7 +141,7 @@ impl<N: NodeIdT> SbvBroadcast<N> {
if !self.netinfo.is_validator() {
return self.try_output();
}
let step: Step<_> = Target::All.message(msg.clone()).into();
let step: Step<_> = Target::all().message(msg.clone()).into();
let our_id = &self.netinfo.our_id().clone();
Ok(step.join(self.handle_message(our_id, &msg)?))
}

View File

@ -202,7 +202,7 @@ impl<N: NodeIdT> Broadcast<N> {
result = Ok(proof);
} else {
// Rest of the proofs are sent to remote nodes.
let msg = Target::Node(id.clone()).message(Message::Value(proof));
let msg = Target::node(id.clone()).message(Message::Value(proof));
step.messages.push(msg);
}
}
@ -402,25 +402,10 @@ impl<N: NodeIdT> Broadcast<N> {
}
let echo_msg = Message::Echo(p.clone());
let mut step = Step::default();
// `N - 2f + g` node ids to the left of our_id (excluding our_id)
// after arranging all node ids in a circular list.
let left = self
.netinfo
.all_ids()
.cycle()
.skip_while(|x| *x != self.our_id())
.take(self.netinfo.num_correct() - self.netinfo.num_faulty() + self.fault_estimate)
.skip(1);
for id in left {
let msg = Target::Node(id.clone()).message(echo_msg.clone());
step.messages.push(msg);
}
// Send `Echo` message to all non-validating nodes.
step.extend(
Target::AllExcept(self.netinfo.all_ids().cloned().collect::<BTreeSet<_>>())
.message(echo_msg)
.into(),
);
let right = self.right_nodes().cloned().collect();
// Send `Echo` message to all non-validating nodes and the ones on our left.
let msg = Target::AllExcept(right).message(echo_msg);
step.messages.push(msg);
let our_id = &self.our_id().clone();
Ok(step.join(self.handle_echo(our_id, p)?))
}
@ -444,19 +429,12 @@ impl<N: NodeIdT> Broadcast<N> {
let mut step = Step::default();
let senders = self.can_decodes.get(hash);
// Remaining node ids to the right of our_id
// after arranging all node ids in a circular list.
let right = self
.netinfo
.all_ids()
.cycle()
.skip_while(|x| *x != self.our_id())
.skip(self.netinfo.num_correct() - self.netinfo.num_faulty() + self.fault_estimate)
.take_while(|x| *x != self.our_id());
let msgs = right
.right_nodes()
.filter(|id| senders.map_or(true, |s| !s.contains(id)))
.map(|id| Target::Node(id.clone()).message(echo_msg.clone()));
step.messages.extend(msgs);
.cloned()
.collect();
step.messages.push(Target::Nodes(right).message(echo_msg));
Ok(step)
}
@ -468,23 +446,30 @@ impl<N: NodeIdT> Broadcast<N> {
}
let echo_hash_msg = Message::EchoHash(*hash);
let mut step = Step::default();
// Remaining node ids to the right of our_id
// after arranging all node ids in a circular list.
let right = self
.netinfo
.all_ids()
.cycle()
.skip_while(|x| *x != self.our_id())
.skip(self.netinfo.num_correct() - self.netinfo.num_faulty() + self.fault_estimate)
.take_while(|x| *x != self.our_id());
for id in right {
let msg = Target::Node(id.clone()).message(echo_hash_msg.clone());
step.messages.push(msg);
}
let right = self.right_nodes().cloned().collect();
let msg = Target::Nodes(right).message(echo_hash_msg);
step.messages.push(msg);
let our_id = &self.our_id().clone();
Ok(step.join(self.handle_echo_hash(our_id, hash)?))
}
/// Returns an iterator over all nodes to our right.
///
/// The nodes are arranged in a circle according to their ID, starting with our own. The first
/// _N - 2 f + g_ nodes are considered "to our left" and the rest "to our right".
///
/// These are the nodes to which we only send an `EchoHash` message in the beginning.
fn right_nodes(&self) -> impl Iterator<Item = &N> {
let our_id = self.our_id().clone();
let not_us = move |x: &&N| **x != our_id;
self.netinfo
.all_ids()
.cycle()
.skip_while(not_us.clone())
.skip(self.netinfo.num_correct() - self.netinfo.num_faulty() + self.fault_estimate)
.take_while(not_us)
}
/// Sends a `CanDecode` message and handles it. Does nothing if we are only an observer.
fn send_can_decode(&mut self, hash: &Digest) -> Result<Step<N>> {
self.can_decode_sent.insert(hash.clone());
@ -495,15 +480,17 @@ impl<N: NodeIdT> Broadcast<N> {
let can_decode_msg = Message::CanDecode(*hash);
let mut step = Step::default();
for id in self.netinfo.all_ids() {
match self.echos.get(id) {
Some(EchoContent::Hash(_)) | None => {
let msg = Target::Node(id.clone()).message(can_decode_msg.clone());
step.messages.push(msg);
}
_ => (),
}
}
let recipients = self
.netinfo
.all_ids()
.filter(|id| match self.echos.get(id) {
Some(EchoContent::Hash(_)) | None => true,
_ => false,
})
.cloned()
.collect();
let msg = Target::Nodes(recipients).message(can_decode_msg);
step.messages.push(msg);
let our_id = &self.our_id().clone();
Ok(step.join(self.handle_can_decode(our_id, hash)?))
}
@ -515,7 +502,7 @@ impl<N: NodeIdT> Broadcast<N> {
return Ok(Step::default());
}
let ready_msg = Message::Ready(*hash);
let step: Step<_> = Target::All.message(ready_msg).into();
let step: Step<_> = Target::all().message(ready_msg).into();
let our_id = &self.our_id().clone();
Ok(step.join(self.handle_ready(our_id, hash)?))
}

View File

@ -209,27 +209,10 @@
//! message: TargetedMessage { target, message },
//! }) = messages.pop_front()
//! {
//! match target {
//! Target::All => {
//! for (id, node) in &mut nodes {
//! let step = node.handle_message(&source, message.clone())?;
//! on_step(*id, step, &mut messages, &mut finished_nodes);
//! }
//! }
//! Target::AllExcept(exclude) => {
//! for (id, node) in nodes.iter_mut().filter(|&(id, _)| !exclude.contains(id)) {
//! let step = node.handle_message(&source, message.clone())?;
//! on_step(*id, step, &mut messages, &mut finished_nodes);
//! }
//! }
//! Target::Node(id) => {
//! let step = {
//! let node = nodes.get_mut(&id).unwrap();
//! node.handle_message(&source, message)?
//! };
//! on_step(id, step, &mut messages, &mut finished_nodes);
//! }
//! };
//! for (id, node) in nodes.iter_mut().filter(|&(id, _)| target.contains(id)) {
//! let step = node.handle_message(&source, message.clone())?;
//! on_step(*id, step, &mut messages, &mut finished_nodes);
//! }
//! }
//! // Every node should output exactly once. Here we check the second half of this statement,
//! // namely that every node outputs.

View File

@ -172,7 +172,7 @@ where
}
let signed_vote = self.vote_counter.sign_vote_for(change)?.clone();
let msg = Message::SignedVote(signed_vote);
Ok(Target::All.message(msg).into())
Ok(Target::all().message(msg).into())
}
/// Casts a vote to add a node as a validator.
@ -494,7 +494,7 @@ where
self.key_gen_msg_buffer.push(signed_msg);
}
let msg = Message::KeyGen(self.era, kg_msg, sig);
Ok(Target::All.message(msg).into())
Ok(Target::all().message(msg).into())
}
/// If the current Key Generation process is ready, returns the `KeyGenState`.

View File

@ -34,7 +34,7 @@
//! New observers can only join the network after an epoch where `change` was not `None`. These
//! epochs' batches contain a `JoinPlan`, which can be sent as an invitation to the new node: The
//! `DynamicHoneyBadger` instance created from a `JoinPlan` will start as an observer in the
//! following epoch. All `Target::All` messages from that and later epochs must be sent to the new
//! following epoch. All `Target::all()` messages from that and later epochs must be sent to the new
//! node.
//!
//! Observer nodes can leave the network at any time.

View File

@ -1,4 +1,5 @@
use std::collections::BTreeSet;
use std::iter;
/// Message sent by a given source.
#[derive(Clone, Debug)]
@ -12,17 +13,26 @@ pub struct SourcedMessage<M, N> {
/// The intended recipient(s) of a message.
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum Target<N> {
/// The message must be sent to all remote nodes.
All,
/// The message must be sent to the node with the given ID.
Node(N),
/// The message must be sent to the nodes with the given IDs.
/// It is _not_ automatically sent to observers.
Nodes(BTreeSet<N>),
/// The message must be sent to all remote nodes except the passed nodes.
/// Useful for sending messages to observer nodes that aren't
/// present in a node's `all_ids()` list.
AllExcept(BTreeSet<N>),
}
impl<N> Target<N> {
impl<N: Ord> Target<N> {
/// Creates a new `Target` addressing all peers, including observers.
pub fn all() -> Self {
Target::AllExcept(BTreeSet::new())
}
/// Creates a new `Target` addressing a single peer.
pub fn node(node_id: N) -> Self {
Target::Nodes(iter::once(node_id).collect())
}
/// Returns a `TargetedMessage` with this target, and the given message.
pub fn message<M>(self, message: M) -> TargetedMessage<M, N> {
TargetedMessage {
@ -30,6 +40,14 @@ impl<N> Target<N> {
message,
}
}
/// Returns whether `node_id` is included in this target.
pub fn contains(&self, node_id: &N) -> bool {
match self {
Target::Nodes(ids) => ids.contains(node_id),
Target::AllExcept(ids) => !ids.contains(node_id),
}
}
}
/// Message with a designated target.

View File

@ -258,7 +258,7 @@ where
.filter_map(|key| queue.remove(&key))
.flatten()
.filter(|msg| !msg.is_obsolete(epoch))
.map(|msg| Target::Node(sender_id.clone()).message(Message::Algo(msg)))
.map(|msg| Target::node(sender_id.clone()).message(Message::Algo(msg)))
.into()
}
@ -318,9 +318,8 @@ where
}
if !self.is_removed || send_last_epoch_started {
// Announce the new epoch.
Target::All
.message(Message::EpochStarted(self.algo.epoch()))
.into()
let msg = Message::EpochStarted(self.algo.epoch());
Target::all().message(msg).into()
} else {
// If removed, do not announce the new epoch to prevent peers from sending messages to
// this node.
@ -329,7 +328,7 @@ where
}
/// Removes any messages to nodes at earlier epochs from the given `Step`. This may involve
/// decomposing a `Target::All` message into `Target::Node` messages and sending some of the
/// decomposing a `Target::all()` message into `Target::Nodes` messages and sending some of the
/// resulting messages while placing onto the queue those remaining messages whose recipient is
/// currently at an earlier epoch.
fn defer_messages(&mut self, step: &mut CpStep<D>) {
@ -444,7 +443,7 @@ where
participants_after_change: BTreeSet::new(),
is_removed: false,
};
let step = Target::All.message(Message::EpochStarted(epoch)).into();
let step = Target::all().message(Message::EpochStarted(epoch)).into();
(sq, step)
}
}

View File

@ -162,7 +162,7 @@ impl<N: NodeIdT> ThresholdDecrypt<N> {
(_, _) => return Ok(step.join(self.try_output()?)), // Not a validator.
};
let our_id = self.our_id().clone();
let msg = Target::All.message(Message(share.clone()));
let msg = Target::all().message(Message(share.clone()));
step.messages.push(msg);
self.shares.insert(our_id, (idx, share));
step.extend(self.try_output()?);

View File

@ -167,7 +167,7 @@ impl<N: NodeIdT> ThresholdSign<N> {
Some(sks) => Message(sks.sign_g2(hash)),
None => return Ok(step.join(self.try_output()?)), // Not a validator.
};
step.messages.push(Target::All.message(msg.clone()));
step.messages.push(Target::all().message(msg.clone()));
let id = self.our_id().clone();
step.extend(self.handle_message(&id, msg)?);
Ok(step)

View File

@ -255,57 +255,39 @@ where
let mut passed_msgs: Vec<_> = Vec::new();
for msg in self.messages.drain(..) {
match msg.target.clone() {
Target::Node(id) => {
if let Some(&them) = peer_epochs.get(&id) {
if msg.message.is_premature(them, max_future_epochs) {
deferred_msgs.push((id, msg.message));
} else if !msg.message.is_obsolete(them) {
passed_msgs.push(msg);
}
}
}
Target::All => {
let is_accepted = |&them| msg.message.is_accepted(them, max_future_epochs);
Target::Nodes(mut ids) => {
let is_premature = |&them| msg.message.is_premature(them, max_future_epochs);
let is_obsolete = |&them| msg.message.is_obsolete(them);
if peer_epochs.values().all(is_accepted) {
passed_msgs.push(msg);
} else {
// The `Target::All` message is split into two sets of point messages: those
// which can be sent without delay and those which should be postponed.
for (id, them) in peer_epochs {
for (id, them) in peer_epochs {
if ids.contains(id) {
if is_premature(them) {
deferred_msgs.push((id.clone(), msg.message.clone()));
} else if !is_obsolete(them) {
passed_msgs
.push(Target::Node(id.clone()).message(msg.message.clone()));
ids.remove(id);
} else if is_obsolete(them) {
ids.remove(id);
}
}
}
if !ids.is_empty() {
passed_msgs.push(Target::Nodes(ids).message(msg.message));
}
}
Target::AllExcept(exclude) => {
let is_accepted = |&them| msg.message.is_accepted(them, max_future_epochs);
Target::AllExcept(mut exclude) => {
let is_premature = |&them| msg.message.is_premature(them, max_future_epochs);
let is_obsolete = |&them| msg.message.is_obsolete(them);
let filtered_nodes: BTreeMap<_, _> = peer_epochs
.iter()
.filter(|(id, _)| !exclude.contains(id))
.map(|(k, v)| (k.clone(), *v))
.collect();
if filtered_nodes.values().all(is_accepted) {
passed_msgs.push(msg);
} else {
// The `Target::AllExcept` message is split into two sets of point messages: those
// which can be sent without delay and those which should be postponed.
for (id, them) in &filtered_nodes {
for (id, them) in peer_epochs {
if !exclude.contains(id) {
if is_premature(them) {
deferred_msgs.push((id.clone(), msg.message.clone()));
} else if !is_obsolete(them) {
passed_msgs
.push(Target::Node(id.clone()).message(msg.message.clone()));
exclude.insert(id.clone());
} else if is_obsolete(them) {
exclude.insert(id.clone());
}
}
}
if exclude.len() < peer_epochs.len() {
passed_msgs.push(Target::AllExcept(exclude).message(msg.message));
}
}
}
}

View File

@ -108,7 +108,7 @@ impl Adversary<UsizeHoneyBadger> for FaultyShareAdversary {
// Send the share to remote nodes.
for proposer_id in &all_node_ids {
step.messages.push(
Target::All.message(sender_queue::Message::Algo(
Target::all().message(sender_queue::Message::Algo(
MessageContent::DecryptionShare {
proposer_id: *proposer_id,
share: threshold_decrypt::Message(share.clone()),