Add an`AllExcept(BTreeSet<N>)` type to `Target` enum to enable sending messages

to non-validators from Broadcast.
* Use `Target::AllExcept` in Broadcast to send `Echo` messages to all non-validator nodes.
* Add `AllExcept(_)` match arms for `Target` match expressions.
This commit is contained in:
pawanjay176 2019-05-14 20:16:59 +05:30
parent 461a4b1dfc
commit 7f46250057
7 changed files with 89 additions and 6 deletions

View File

@ -2,6 +2,7 @@
use crossbeam::thread::{Scope, ScopedJoinHandle};
use crossbeam_channel::{self, bounded, select, unbounded, Receiver, Sender};
use hbbft::{SourcedMessage, Target, TargetedMessage};
use std::collections::BTreeSet;
/// The queue functionality for messages sent between algorithm instances.
/// The messaging struct allows for targeted message exchange between comms
@ -107,7 +108,7 @@ impl<M: Send> Messaging<M> {
select! {
recv(rx_from_algo) -> tm => {
if let Ok(tm) = tm {
match tm.target {
match &tm.target {
Target::All => {
// Send the message to all remote nodes, stopping at the first
// error.
@ -120,9 +121,26 @@ impl<M: Send> Messaging<M> {
}
}).map_err(Error::from);
},
Target::AllExcept(known) => {
// Send the message to all remote nodes not in `known`, stopping at the first
// error.
let filtered_txs: Vec<_> = (0..txs_to_comms.len())
.collect::<BTreeSet<_>>()
.difference(known)
.cloned()
.collect();
result = filtered_txs.iter()
.fold(Ok(()), |result, i| {
if result.is_ok() {
txs_to_comms[*i].send(tm.message.clone())
} else {
result
}
}).map_err(Error::from);
},
Target::Node(i) => {
result = if i < txs_to_comms.len() {
txs_to_comms[i].send(tm.message)
result = if *i < txs_to_comms.len() {
txs_to_comms[*i].send(tm.message)
.map_err(Error::from)
} else {
Err(Error::NoSuchTarget)

View File

@ -273,7 +273,7 @@ where
Q: IntoIterator<Item = TimestampedMessage<D>>,
{
for ts_msg in msgs {
match ts_msg.target {
match &ts_msg.target {
Target::All => {
for node in self.nodes.values_mut() {
if node.id != ts_msg.sender_id {
@ -281,6 +281,13 @@ where
}
}
}
Target::AllExcept(known) => {
for node in self.nodes.values_mut().filter(|n| !known.contains(&n.id)) {
if node.id != ts_msg.sender_id {
node.add_message(ts_msg.clone())
}
}
}
Target::Node(to_id) => {
if let Some(node) = self.nodes.get_mut(&to_id) {
node.add_message(ts_msg);

View File

@ -257,6 +257,22 @@ where
message_count = message_count.saturating_add(1);
}
dest.push_back(NetworkMessage::new(
stepped_id.clone(),
tmsg.message.clone(),
to.clone(),
));
}
}
hbbft::Target::AllExcept(known) => {
for to in nodes
.keys()
.filter(|&to| to != &stepped_id && !known.contains(to))
{
if !faulty {
message_count = message_count.saturating_add(1);
}
dest.push_back(NetworkMessage::new(
stepped_id.clone(),
tmsg.message.clone(),

View File

@ -232,8 +232,8 @@ impl<N: NodeIdT> Broadcast<N> {
sender_id
);
return Ok(Step::default());
},
_ => ()
}
_ => (),
};
// If the proof is invalid, log the faulty node behavior and ignore.
@ -413,6 +413,12 @@ impl<N: NodeIdT> Broadcast<N> {
let msg = Target::Node(id.clone()).message(echo_msg.clone());
step.messages.push(msg);
}
// Send `Echo` message to all non-validating nodes.
step.extend(
Target::AllExcept(self.netinfo.all_ids().cloned().collect::<BTreeSet<_>>())
.message(echo_msg)
.into(),
);
let our_id = &self.our_id().clone();
Ok(step.join(self.handle_echo(our_id, p)?))
}

View File

@ -168,6 +168,12 @@
//! on_step(*id, step, &mut messages, &mut finished_nodes);
//! }
//! }
//! Target::AllExcept(known) => {
//! for (id, node) in nodes.iter_mut().filter(|&(id, _)| !known.contains(id)) {
//! let step = node.handle_message(&source, message.clone())?;
//! on_step(*id, step, &mut messages, &mut finished_nodes);
//! }
//! }
//! Target::Node(id) => {
//! let step = {
//! let node = nodes.get_mut(&id).unwrap();

View File

@ -1,3 +1,5 @@
use std::collections::BTreeSet;
/// Message sent by a given source.
#[derive(Clone, Debug)]
pub struct SourcedMessage<M, N> {
@ -14,6 +16,10 @@ pub enum Target<N> {
All,
/// The message must be sent to the node with the given ID.
Node(N),
/// The message must be sent to all remote nodes except the passed nodes.
/// Useful for sending messages to Observer nodes that aren't
/// present in a node's all_ids() list.
AllExcept(BTreeSet<N>),
}
impl<N> Target<N> {

View File

@ -283,6 +283,30 @@ where
}
}
}
Target::AllExcept(known) => {
let is_accepted = |&them| msg.message.is_accepted(them, max_future_epochs);
let is_premature = |&them| msg.message.is_premature(them, max_future_epochs);
let is_obsolete = |&them| msg.message.is_obsolete(them);
let filtered_nodes: BTreeMap<_, _> = peer_epochs
.iter()
.filter(|(id, _)| !known.contains(id))
.map(|(k, v)| (k.clone(), *v))
.collect();
if filtered_nodes.values().all(is_accepted) {
passed_msgs.push(msg);
} else {
// The `Target::All` message is split into two sets of point messages: those
// which can be sent without delay and those which should be postponed.
for (id, them) in &filtered_nodes {
if is_premature(them) {
deferred_msgs.push((id.clone(), msg.message.clone()));
} else if !is_obsolete(them) {
passed_msgs
.push(Target::Node(id.clone()).message(msg.message.clone()));
}
}
}
}
}
}
self.messages.extend(passed_msgs);