broadcast READY message handler ported from the older version

This commit is contained in:
Vladimir Komendantskiy 2018-04-25 21:00:22 +01:00
parent 8cd1f4748d
commit db06e75d4e
1 changed files with 50 additions and 1 deletions

View File

@ -208,7 +208,56 @@ impl Broadcast {
}
},
_ => Err(Error::NotImplemented).map_err(E::from)
BroadcastMessage::Ready(ref hash) => {
// Update the number Ready has been received with this
// hash.
*state.readys.entry(hash.to_vec()).or_insert(1) += 1;
// Check that the root hash matches.
if let Some(ref h) = state.root_hash.clone() {
let ready_num = *state.readys.get(h).unwrap_or(&0);
let mut outgoing = VecDeque::new();
// Upon receiving f + 1 matching Ready(h) messages,
// if Ready has not yet been sent, multicast
// Ready(h).
if (ready_num == self.num_faulty_nodes + 1) &&
!state.ready_sent
{
// Enqueue a broadcast of a ready message.
outgoing.push_back(RemoteMessage {
node: RemoteNode::All,
message: Message::Broadcast(
BroadcastMessage::Ready(h.to_vec()))
});
}
// Upon receiving 2f + 1 matching Ready(h) messages,
// wait for N 2f Echo messages, then decode v.
if ready_num > 2 * self.num_faulty_nodes {
// Wait for N - 2f Echo messages, then decode v.
if state.echo_num >=
self.num_nodes - 2 * self.num_faulty_nodes
{
let value = decode_from_shards(
&mut state.leaf_values,
&self.coding,
self.data_shard_num, h)?;
tx.send(QMessage::Local(LocalMessage {
dst: Algorithm::CommonSubset,
message: AlgoMessage::Broadcast(value)
})).map_err(Error::from)?;
}
else {
state.ready_to_decode = true;
}
}
Ok(MessageLoopState::Processing(outgoing))
}
else { no_outgoing }
}
}}
else {
Err(Error::UnexpectedMessage).map_err(E::from)