simplified Step::defer_messages

This commit is contained in:
Vladimir Komendantskiy 2018-11-05 13:38:02 +00:00
parent 3deb5f1bce
commit 52cbde42f6
1 changed files with 22 additions and 33 deletions

View File

@ -252,45 +252,34 @@ where
<D as DistAlgorithm>::NodeId: 'i,
{
let messages = &mut self.messages;
let pass =
|TargetedMessage { target, message }: &TargetedMessage<D::Message, D::NodeId>| {
match target {
Target::All => peer_epochs
.values()
.all(|&them| message.is_accepted(them, max_future_epochs)),
Target::Node(id) => peer_epochs
.get(&id)
.map_or(false, |&them| message.is_accepted(them, max_future_epochs)),
}
};
// `Target::All` messages contained in the result of the partitioning are analyzed further
// and each split into two sets of point messages: those which can be sent without delay and
// those which should be postponed.
let mut deferred_msgs: Vec<(D::NodeId, D::Message)> = Vec::new();
let mut passed_msgs: Vec<_> = Vec::new();
for msg in messages.drain(..) {
if pass(&msg) {
passed_msgs.push(msg);
} else {
let m = msg.message;
match msg.target {
Target::Node(ref id) => {
let defer = {
let lagging = |&them| {
!(m.is_accepted(them, max_future_epochs) || m.is_obsolete(them))
};
peer_epochs.get(&id).map_or(true, lagging)
};
if defer {
deferred_msgs.push((id.clone(), m));
match msg.target.clone() {
Target::Node(id) => {
if let Some(&them) = peer_epochs.get(&id) {
if msg.message.is_accepted(them, max_future_epochs) {
passed_msgs.push(msg);
} else if !msg.message.is_obsolete(them) {
deferred_msgs.push((id, msg.message));
}
}
Target::All => {
}
Target::All => {
if peer_epochs
.values()
.all(|&them| msg.message.is_accepted(them, max_future_epochs))
{
passed_msgs.push(msg);
} else {
// The `Target::All` message is split into two sets of point messages: those
// which can be sent without delay and those which should be postponed.
for (id, &them) in peer_epochs {
if m.is_accepted(them, max_future_epochs) {
passed_msgs.push(Target::Node(id.clone()).message(m.clone()));
} else if !m.is_obsolete(them) {
deferred_msgs.push((id.clone(), m.clone()));
if msg.message.is_accepted(them, max_future_epochs) {
passed_msgs
.push(Target::Node(id.clone()).message(msg.message.clone()));
} else if !msg.message.is_obsolete(them) {
deferred_msgs.push((id.clone(), msg.message.clone()));
}
}
}