diff --git a/examples/network/messaging.rs b/examples/network/messaging.rs index 80e332e..0402463 100644 --- a/examples/network/messaging.rs +++ b/examples/network/messaging.rs @@ -2,7 +2,6 @@ use crossbeam::thread::{Scope, ScopedJoinHandle}; use crossbeam_channel::{self, bounded, select, unbounded, Receiver, Sender}; use hbbft::{SourcedMessage, Target, TargetedMessage}; -use std::collections::BTreeSet; /// The queue functionality for messages sent between algorithm instances. /// The messaging struct allows for targeted message exchange between comms @@ -108,43 +107,15 @@ impl Messaging { select! { recv(rx_from_algo) -> tm => { if let Ok(tm) = tm { - match &tm.target { - Target::All => { - // Send the message to all remote nodes, stopping at the first - // error. - result = txs_to_comms.iter() - .fold(Ok(()), |result, tx| { - if result.is_ok() { - tx.send(tm.message.clone()) - } else { - result - } - }).map_err(Error::from); - }, - Target::AllExcept(exclude) => { - // Send the message to all remote nodes not in `exclude`, stopping at the first - // error. - let filtered_txs: Vec<_> = (0..txs_to_comms.len()) - .collect::>() - .difference(exclude) - .cloned() - .collect(); - result = filtered_txs.iter() - .fold(Ok(()), |result, i| { - if result.is_ok() { - txs_to_comms[*i].send(tm.message.clone()) - } else { - result - } - }).map_err(Error::from); - }, - Target::Node(i) => { - result = if *i < txs_to_comms.len() { - txs_to_comms[*i].send(tm.message) - .map_err(Error::from) - } else { - Err(Error::NoSuchTarget) - }; + if match tm.target { + Target::AllExcept(ref ids) => ids, + Target::Nodes(ref ids) => ids, + }.iter().any(|i| *i >= txs_to_comms.len()) { + return Err(Error::NoSuchTarget); + } + for (i, tx) in txs_to_comms.iter().enumerate() { + if tm.target.contains(&i) { + tx.send(tm.message.clone())?; } } } diff --git a/examples/simulation.rs b/examples/simulation.rs index 70d2f75..3fdffff 100644 --- a/examples/simulation.rs +++ b/examples/simulation.rs @@ -273,25 +273,9 @@ where Q: IntoIterator>, { for ts_msg in msgs { - match &ts_msg.target { - Target::All => { - for node in self.nodes.values_mut() { - if node.id != ts_msg.sender_id { - node.add_message(ts_msg.clone()) - } - } - } - Target::AllExcept(exclude) => { - for node in self.nodes.values_mut().filter(|n| !exclude.contains(&n.id)) { - if node.id != ts_msg.sender_id { - node.add_message(ts_msg.clone()) - } - } - } - Target::Node(to_id) => { - if let Some(node) = self.nodes.get_mut(&to_id) { - node.add_message(ts_msg); - } + for node in self.nodes.values_mut() { + if ts_msg.target.contains(&node.id) && node.id != ts_msg.sender_id { + node.add_message(ts_msg.clone()) } } } diff --git a/hbbft_testing/src/lib.rs b/hbbft_testing/src/lib.rs index 7c57cee..b200848 100644 --- a/hbbft_testing/src/lib.rs +++ b/hbbft_testing/src/lib.rs @@ -237,9 +237,8 @@ where // Queue all messages for processing. for tmsg in &step.messages { - match &tmsg.target { - // Single target message. - hbbft::Target::Node(to) => { + for to in nodes.keys() { + if tmsg.target.contains(to) && to != &stepped_id { if !faulty { message_count = message_count.saturating_add(1); } @@ -250,36 +249,6 @@ where to.clone(), )); } - // Broadcast messages get expanded into multiple direct messages. - hbbft::Target::All => { - for to in nodes.keys().filter(|&to| to != &stepped_id) { - if !faulty { - message_count = message_count.saturating_add(1); - } - - dest.push_back(NetworkMessage::new( - stepped_id.clone(), - tmsg.message.clone(), - to.clone(), - )); - } - } - hbbft::Target::AllExcept(exclude) => { - for to in nodes - .keys() - .filter(|&to| to != &stepped_id && !exclude.contains(to)) - { - if !faulty { - message_count = message_count.saturating_add(1); - } - - dest.push_back(NetworkMessage::new( - stepped_id.clone(), - tmsg.message.clone(), - to.clone(), - )); - } - } } } diff --git a/src/binary_agreement/binary_agreement.rs b/src/binary_agreement/binary_agreement.rs index 8152469..ce1f543 100644 --- a/src/binary_agreement/binary_agreement.rs +++ b/src/binary_agreement/binary_agreement.rs @@ -386,7 +386,7 @@ impl BinaryAgreement { if !self.netinfo.is_validator() { return Ok(Step::default()); } - let step: Step = Target::All + let step: Step = Target::all() .message(content.clone().with_epoch(self.epoch)) .into(); let our_id = &self.our_id().clone(); @@ -462,7 +462,7 @@ impl BinaryAgreement { debug!("{}: decision: {}", self, b); if self.netinfo.is_validator() { let msg = MessageContent::Term(b).with_epoch(self.epoch + 1); - step.messages.push(Target::All.message(msg)); + step.messages.push(Target::all().message(msg)); } step } diff --git a/src/binary_agreement/sbv_broadcast.rs b/src/binary_agreement/sbv_broadcast.rs index 491fde7..7ac30f5 100644 --- a/src/binary_agreement/sbv_broadcast.rs +++ b/src/binary_agreement/sbv_broadcast.rs @@ -141,7 +141,7 @@ impl SbvBroadcast { if !self.netinfo.is_validator() { return self.try_output(); } - let step: Step<_> = Target::All.message(msg.clone()).into(); + let step: Step<_> = Target::all().message(msg.clone()).into(); let our_id = &self.netinfo.our_id().clone(); Ok(step.join(self.handle_message(our_id, &msg)?)) } diff --git a/src/broadcast/broadcast.rs b/src/broadcast/broadcast.rs index e241c93..1c33978 100644 --- a/src/broadcast/broadcast.rs +++ b/src/broadcast/broadcast.rs @@ -202,7 +202,7 @@ impl Broadcast { result = Ok(proof); } else { // Rest of the proofs are sent to remote nodes. - let msg = Target::Node(id.clone()).message(Message::Value(proof)); + let msg = Target::node(id.clone()).message(Message::Value(proof)); step.messages.push(msg); } } @@ -402,25 +402,10 @@ impl Broadcast { } let echo_msg = Message::Echo(p.clone()); let mut step = Step::default(); - // `N - 2f + g` node ids to the left of our_id (excluding our_id) - // after arranging all node ids in a circular list. - let left = self - .netinfo - .all_ids() - .cycle() - .skip_while(|x| *x != self.our_id()) - .take(self.netinfo.num_correct() - self.netinfo.num_faulty() + self.fault_estimate) - .skip(1); - for id in left { - let msg = Target::Node(id.clone()).message(echo_msg.clone()); - step.messages.push(msg); - } - // Send `Echo` message to all non-validating nodes. - step.extend( - Target::AllExcept(self.netinfo.all_ids().cloned().collect::>()) - .message(echo_msg) - .into(), - ); + let right = self.right_nodes().cloned().collect(); + // Send `Echo` message to all non-validating nodes and the ones on our left. + let msg = Target::AllExcept(right).message(echo_msg); + step.messages.push(msg); let our_id = &self.our_id().clone(); Ok(step.join(self.handle_echo(our_id, p)?)) } @@ -444,19 +429,12 @@ impl Broadcast { let mut step = Step::default(); let senders = self.can_decodes.get(hash); - // Remaining node ids to the right of our_id - // after arranging all node ids in a circular list. let right = self - .netinfo - .all_ids() - .cycle() - .skip_while(|x| *x != self.our_id()) - .skip(self.netinfo.num_correct() - self.netinfo.num_faulty() + self.fault_estimate) - .take_while(|x| *x != self.our_id()); - let msgs = right + .right_nodes() .filter(|id| senders.map_or(true, |s| !s.contains(id))) - .map(|id| Target::Node(id.clone()).message(echo_msg.clone())); - step.messages.extend(msgs); + .cloned() + .collect(); + step.messages.push(Target::Nodes(right).message(echo_msg)); Ok(step) } @@ -468,23 +446,30 @@ impl Broadcast { } let echo_hash_msg = Message::EchoHash(*hash); let mut step = Step::default(); - // Remaining node ids to the right of our_id - // after arranging all node ids in a circular list. - let right = self - .netinfo - .all_ids() - .cycle() - .skip_while(|x| *x != self.our_id()) - .skip(self.netinfo.num_correct() - self.netinfo.num_faulty() + self.fault_estimate) - .take_while(|x| *x != self.our_id()); - for id in right { - let msg = Target::Node(id.clone()).message(echo_hash_msg.clone()); - step.messages.push(msg); - } + let right = self.right_nodes().cloned().collect(); + let msg = Target::Nodes(right).message(echo_hash_msg); + step.messages.push(msg); let our_id = &self.our_id().clone(); Ok(step.join(self.handle_echo_hash(our_id, hash)?)) } + /// Returns an iterator over all nodes to our right. + /// + /// The nodes are arranged in a circle according to their ID, starting with our own. The first + /// _N - 2 f + g_ nodes are considered "to our left" and the rest "to our right". + /// + /// These are the nodes to which we only send an `EchoHash` message in the beginning. + fn right_nodes(&self) -> impl Iterator { + let our_id = self.our_id().clone(); + let not_us = move |x: &&N| **x != our_id; + self.netinfo + .all_ids() + .cycle() + .skip_while(not_us.clone()) + .skip(self.netinfo.num_correct() - self.netinfo.num_faulty() + self.fault_estimate) + .take_while(not_us) + } + /// Sends a `CanDecode` message and handles it. Does nothing if we are only an observer. fn send_can_decode(&mut self, hash: &Digest) -> Result> { self.can_decode_sent.insert(hash.clone()); @@ -495,15 +480,17 @@ impl Broadcast { let can_decode_msg = Message::CanDecode(*hash); let mut step = Step::default(); - for id in self.netinfo.all_ids() { - match self.echos.get(id) { - Some(EchoContent::Hash(_)) | None => { - let msg = Target::Node(id.clone()).message(can_decode_msg.clone()); - step.messages.push(msg); - } - _ => (), - } - } + let recipients = self + .netinfo + .all_ids() + .filter(|id| match self.echos.get(id) { + Some(EchoContent::Hash(_)) | None => true, + _ => false, + }) + .cloned() + .collect(); + let msg = Target::Nodes(recipients).message(can_decode_msg); + step.messages.push(msg); let our_id = &self.our_id().clone(); Ok(step.join(self.handle_can_decode(our_id, hash)?)) } @@ -515,7 +502,7 @@ impl Broadcast { return Ok(Step::default()); } let ready_msg = Message::Ready(*hash); - let step: Step<_> = Target::All.message(ready_msg).into(); + let step: Step<_> = Target::all().message(ready_msg).into(); let our_id = &self.our_id().clone(); Ok(step.join(self.handle_ready(our_id, hash)?)) } diff --git a/src/broadcast/mod.rs b/src/broadcast/mod.rs index 2a93ace..927b7ae 100644 --- a/src/broadcast/mod.rs +++ b/src/broadcast/mod.rs @@ -209,27 +209,10 @@ //! message: TargetedMessage { target, message }, //! }) = messages.pop_front() //! { -//! match target { -//! Target::All => { -//! for (id, node) in &mut nodes { -//! let step = node.handle_message(&source, message.clone())?; -//! on_step(*id, step, &mut messages, &mut finished_nodes); -//! } -//! } -//! Target::AllExcept(exclude) => { -//! for (id, node) in nodes.iter_mut().filter(|&(id, _)| !exclude.contains(id)) { -//! let step = node.handle_message(&source, message.clone())?; -//! on_step(*id, step, &mut messages, &mut finished_nodes); -//! } -//! } -//! Target::Node(id) => { -//! let step = { -//! let node = nodes.get_mut(&id).unwrap(); -//! node.handle_message(&source, message)? -//! }; -//! on_step(id, step, &mut messages, &mut finished_nodes); -//! } -//! }; +//! for (id, node) in nodes.iter_mut().filter(|&(id, _)| target.contains(id)) { +//! let step = node.handle_message(&source, message.clone())?; +//! on_step(*id, step, &mut messages, &mut finished_nodes); +//! } //! } //! // Every node should output exactly once. Here we check the second half of this statement, //! // namely that every node outputs. diff --git a/src/dynamic_honey_badger/dynamic_honey_badger.rs b/src/dynamic_honey_badger/dynamic_honey_badger.rs index 2f5659e..db8752c 100644 --- a/src/dynamic_honey_badger/dynamic_honey_badger.rs +++ b/src/dynamic_honey_badger/dynamic_honey_badger.rs @@ -172,7 +172,7 @@ where } let signed_vote = self.vote_counter.sign_vote_for(change)?.clone(); let msg = Message::SignedVote(signed_vote); - Ok(Target::All.message(msg).into()) + Ok(Target::all().message(msg).into()) } /// Casts a vote to add a node as a validator. @@ -494,7 +494,7 @@ where self.key_gen_msg_buffer.push(signed_msg); } let msg = Message::KeyGen(self.era, kg_msg, sig); - Ok(Target::All.message(msg).into()) + Ok(Target::all().message(msg).into()) } /// If the current Key Generation process is ready, returns the `KeyGenState`. diff --git a/src/dynamic_honey_badger/mod.rs b/src/dynamic_honey_badger/mod.rs index b0357c5..d2ed007 100644 --- a/src/dynamic_honey_badger/mod.rs +++ b/src/dynamic_honey_badger/mod.rs @@ -34,7 +34,7 @@ //! New observers can only join the network after an epoch where `change` was not `None`. These //! epochs' batches contain a `JoinPlan`, which can be sent as an invitation to the new node: The //! `DynamicHoneyBadger` instance created from a `JoinPlan` will start as an observer in the -//! following epoch. All `Target::All` messages from that and later epochs must be sent to the new +//! following epoch. All `Target::all()` messages from that and later epochs must be sent to the new //! node. //! //! Observer nodes can leave the network at any time. diff --git a/src/messaging.rs b/src/messaging.rs index 24d4a57..48c12b3 100644 --- a/src/messaging.rs +++ b/src/messaging.rs @@ -1,4 +1,5 @@ use std::collections::BTreeSet; +use std::iter; /// Message sent by a given source. #[derive(Clone, Debug)] @@ -12,17 +13,26 @@ pub struct SourcedMessage { /// The intended recipient(s) of a message. #[derive(Clone, Debug, PartialEq, Eq)] pub enum Target { - /// The message must be sent to all remote nodes. - All, - /// The message must be sent to the node with the given ID. - Node(N), + /// The message must be sent to the nodes with the given IDs. + /// It is _not_ automatically sent to observers. + Nodes(BTreeSet), /// The message must be sent to all remote nodes except the passed nodes. /// Useful for sending messages to observer nodes that aren't /// present in a node's `all_ids()` list. AllExcept(BTreeSet), } -impl Target { +impl Target { + /// Creates a new `Target` addressing all peers, including observers. + pub fn all() -> Self { + Target::AllExcept(BTreeSet::new()) + } + + /// Creates a new `Target` addressing a single peer. + pub fn node(node_id: N) -> Self { + Target::Nodes(iter::once(node_id).collect()) + } + /// Returns a `TargetedMessage` with this target, and the given message. pub fn message(self, message: M) -> TargetedMessage { TargetedMessage { @@ -30,6 +40,14 @@ impl Target { message, } } + + /// Returns whether `node_id` is included in this target. + pub fn contains(&self, node_id: &N) -> bool { + match self { + Target::Nodes(ids) => ids.contains(node_id), + Target::AllExcept(ids) => !ids.contains(node_id), + } + } } /// Message with a designated target. diff --git a/src/sender_queue/mod.rs b/src/sender_queue/mod.rs index 8b0674c..3a21bc4 100644 --- a/src/sender_queue/mod.rs +++ b/src/sender_queue/mod.rs @@ -258,7 +258,7 @@ where .filter_map(|key| queue.remove(&key)) .flatten() .filter(|msg| !msg.is_obsolete(epoch)) - .map(|msg| Target::Node(sender_id.clone()).message(Message::Algo(msg))) + .map(|msg| Target::node(sender_id.clone()).message(Message::Algo(msg))) .into() } @@ -318,9 +318,8 @@ where } if !self.is_removed || send_last_epoch_started { // Announce the new epoch. - Target::All - .message(Message::EpochStarted(self.algo.epoch())) - .into() + let msg = Message::EpochStarted(self.algo.epoch()); + Target::all().message(msg).into() } else { // If removed, do not announce the new epoch to prevent peers from sending messages to // this node. @@ -329,7 +328,7 @@ where } /// Removes any messages to nodes at earlier epochs from the given `Step`. This may involve - /// decomposing a `Target::All` message into `Target::Node` messages and sending some of the + /// decomposing a `Target::all()` message into `Target::Nodes` messages and sending some of the /// resulting messages while placing onto the queue those remaining messages whose recipient is /// currently at an earlier epoch. fn defer_messages(&mut self, step: &mut CpStep) { @@ -444,7 +443,7 @@ where participants_after_change: BTreeSet::new(), is_removed: false, }; - let step = Target::All.message(Message::EpochStarted(epoch)).into(); + let step = Target::all().message(Message::EpochStarted(epoch)).into(); (sq, step) } } diff --git a/src/threshold_decrypt.rs b/src/threshold_decrypt.rs index a16dada..6b58a37 100644 --- a/src/threshold_decrypt.rs +++ b/src/threshold_decrypt.rs @@ -162,7 +162,7 @@ impl ThresholdDecrypt { (_, _) => return Ok(step.join(self.try_output()?)), // Not a validator. }; let our_id = self.our_id().clone(); - let msg = Target::All.message(Message(share.clone())); + let msg = Target::all().message(Message(share.clone())); step.messages.push(msg); self.shares.insert(our_id, (idx, share)); step.extend(self.try_output()?); diff --git a/src/threshold_sign.rs b/src/threshold_sign.rs index dbf6ee0..fa383bc 100644 --- a/src/threshold_sign.rs +++ b/src/threshold_sign.rs @@ -167,7 +167,7 @@ impl ThresholdSign { Some(sks) => Message(sks.sign_g2(hash)), None => return Ok(step.join(self.try_output()?)), // Not a validator. }; - step.messages.push(Target::All.message(msg.clone())); + step.messages.push(Target::all().message(msg.clone())); let id = self.our_id().clone(); step.extend(self.handle_message(&id, msg)?); Ok(step) diff --git a/src/traits.rs b/src/traits.rs index 529cbc6..a7e13a5 100644 --- a/src/traits.rs +++ b/src/traits.rs @@ -255,57 +255,39 @@ where let mut passed_msgs: Vec<_> = Vec::new(); for msg in self.messages.drain(..) { match msg.target.clone() { - Target::Node(id) => { - if let Some(&them) = peer_epochs.get(&id) { - if msg.message.is_premature(them, max_future_epochs) { - deferred_msgs.push((id, msg.message)); - } else if !msg.message.is_obsolete(them) { - passed_msgs.push(msg); - } - } - } - Target::All => { - let is_accepted = |&them| msg.message.is_accepted(them, max_future_epochs); + Target::Nodes(mut ids) => { let is_premature = |&them| msg.message.is_premature(them, max_future_epochs); let is_obsolete = |&them| msg.message.is_obsolete(them); - if peer_epochs.values().all(is_accepted) { - passed_msgs.push(msg); - } else { - // The `Target::All` message is split into two sets of point messages: those - // which can be sent without delay and those which should be postponed. - for (id, them) in peer_epochs { + for (id, them) in peer_epochs { + if ids.contains(id) { if is_premature(them) { deferred_msgs.push((id.clone(), msg.message.clone())); - } else if !is_obsolete(them) { - passed_msgs - .push(Target::Node(id.clone()).message(msg.message.clone())); + ids.remove(id); + } else if is_obsolete(them) { + ids.remove(id); } } } + if !ids.is_empty() { + passed_msgs.push(Target::Nodes(ids).message(msg.message)); + } } - Target::AllExcept(exclude) => { - let is_accepted = |&them| msg.message.is_accepted(them, max_future_epochs); + Target::AllExcept(mut exclude) => { let is_premature = |&them| msg.message.is_premature(them, max_future_epochs); let is_obsolete = |&them| msg.message.is_obsolete(them); - let filtered_nodes: BTreeMap<_, _> = peer_epochs - .iter() - .filter(|(id, _)| !exclude.contains(id)) - .map(|(k, v)| (k.clone(), *v)) - .collect(); - if filtered_nodes.values().all(is_accepted) { - passed_msgs.push(msg); - } else { - // The `Target::AllExcept` message is split into two sets of point messages: those - // which can be sent without delay and those which should be postponed. - for (id, them) in &filtered_nodes { + for (id, them) in peer_epochs { + if !exclude.contains(id) { if is_premature(them) { deferred_msgs.push((id.clone(), msg.message.clone())); - } else if !is_obsolete(them) { - passed_msgs - .push(Target::Node(id.clone()).message(msg.message.clone())); + exclude.insert(id.clone()); + } else if is_obsolete(them) { + exclude.insert(id.clone()); } } } + if exclude.len() < peer_epochs.len() { + passed_msgs.push(Target::AllExcept(exclude).message(msg.message)); + } } } } diff --git a/tests/honey_badger.rs b/tests/honey_badger.rs index b390590..f17e042 100644 --- a/tests/honey_badger.rs +++ b/tests/honey_badger.rs @@ -108,7 +108,7 @@ impl Adversary for FaultyShareAdversary { // Send the share to remote nodes. for proposer_id in &all_node_ids { step.messages.push( - Target::All.message(sender_queue::Message::Algo( + Target::all().message(sender_queue::Message::Algo( MessageContent::DecryptionShare { proposer_id: *proposer_id, share: threshold_decrypt::Message(share.clone()),