made broadcast handle its own echo and value messages

This commit is contained in:
Andreas Fackler 2018-05-03 08:47:07 +03:00
parent 13407c8774
commit 0f3377c8e9
2 changed files with 164 additions and 173 deletions

View File

@ -9,13 +9,16 @@ use reed_solomon_erasure::ReedSolomon;
use std::collections::{HashMap, HashSet, VecDeque};
use std::fmt::Debug;
use std::hash::Hash;
use std::iter;
use std::marker::{Send, Sync};
use std::sync::{Arc, Mutex, RwLock};
use std::sync::{Arc, Mutex, RwLock, RwLockWriteGuard};
use messaging;
use messaging::{AlgoMessage, Algorithm, Handler, LocalMessage, MessageLoopState, ProposedValue,
QMessage, RemoteMessage, RemoteNode, SourcedMessage, Target, TargetedMessage};
type MessageQueue<NodeUid> = VecDeque<TargetedBroadcastMessage<NodeUid>>;
/// A `BroadcastMessage` to be sent out, together with a target.
#[derive(Clone, Debug)]
pub struct TargetedBroadcastMessage<NodeUid> {
@ -186,10 +189,7 @@ impl<NodeUid: Eq + Hash + Debug + Clone> Broadcast<NodeUid> {
}
/// Processes the proposed value input by broadcasting it.
pub fn propose_value(
&self,
value: ProposedValue,
) -> Result<VecDeque<TargetedBroadcastMessage<NodeUid>>, Error> {
pub fn propose_value(&self, value: ProposedValue) -> Result<MessageQueue<NodeUid>, Error> {
let mut state = self.state.write().unwrap();
// Split the value into chunks/shards, encode them with erasure codes.
// Assemble a Merkle tree from data and parity shards. Take all proofs
@ -224,13 +224,7 @@ impl<NodeUid: Eq + Hash + Debug + Clone> Broadcast<NodeUid> {
fn send_shards(
&self,
mut value: ProposedValue,
) -> Result<
(
Proof<ProposedValue>,
VecDeque<TargetedBroadcastMessage<NodeUid>>,
),
Error,
> {
) -> Result<(Proof<ProposedValue>, MessageQueue<NodeUid>), Error> {
let data_shard_num = self.coding.data_shard_count();
let parity_shard_num = self.coding.parity_shard_count();
@ -305,168 +299,167 @@ impl<NodeUid: Eq + Hash + Debug + Clone> Broadcast<NodeUid> {
/// Handler of messages received from remote nodes.
pub fn handle_broadcast_message(
&self,
uid: &NodeUid,
_uid: &NodeUid, // TODO: Remove or clarify: this should probably be the sender's ID.
message: &BroadcastMessage<ProposedValue>,
) -> Result<
(
Option<ProposedValue>,
VecDeque<TargetedBroadcastMessage<NodeUid>>,
),
Error,
> {
let mut state = self.state.write().unwrap();
let no_outgoing = VecDeque::new();
// A value received. Record the value and multicast an echo.
) -> Result<(Option<ProposedValue>, MessageQueue<NodeUid>), Error> {
let state = self.state.write().unwrap();
match message {
BroadcastMessage::Value(p) => {
if *uid != self.uid {
// Ignore value messages from unrelated remote nodes.
Ok((None, no_outgoing))
} else {
// Initialise the root hash if not already initialised.
if state.root_hash.is_none() {
state.root_hash = Some(p.root_hash.clone());
debug!(
"Node {:?} Value root hash {:?}",
self.uid,
HexBytes(&p.root_hash)
);
}
BroadcastMessage::Value(p) => self.handle_value(p, state),
BroadcastMessage::Echo(p) => self.handle_echo(p, state),
BroadcastMessage::Ready(ref hash) => self.handle_ready(hash, state),
}
}
if let Some(ref h) = state.root_hash.clone() {
if p.validate(h.as_slice()) {
// Save the leaf value for reconstructing the tree
// later.
state.leaf_values[index_of_proof(&p)] =
Some(p.value.clone().into_boxed_slice());
state.leaf_values_num += 1;
}
}
/// Handles a received echo and verifies the proof it contains.
fn handle_value(
&self,
p: &Proof<ProposedValue>,
mut state: RwLockWriteGuard<BroadcastState>,
) -> Result<(Option<ProposedValue>, MessageQueue<NodeUid>), Error> {
// Initialize the root hash if not already initialised.
if state.root_hash.is_none() {
state.root_hash = Some(p.root_hash.clone());
debug!(
"Node {:?} Value root hash {:?}",
self.uid,
HexBytes(&p.root_hash)
);
}
// Enqueue a broadcast of an echo of this proof.
let state = VecDeque::from(vec![TargetedBroadcastMessage {
target: BroadcastTarget::All,
message: BroadcastMessage::Echo(p.clone()),
}]);
Ok((None, state))
}
}
// An echo received. Verify the proof it contains.
BroadcastMessage::Echo(p) => {
if state.root_hash.is_none() && *uid == self.uid {
state.root_hash = Some(p.root_hash.clone());
debug!("Node {:?} Echo root hash {:?}", self.uid, state.root_hash);
}
// Call validate with the root hash as argument.
if let Some(ref h) = state.root_hash.clone() {
if p.validate(h.as_slice()) {
state.echo_num += 1;
// Save the leaf value for reconstructing the
// tree later.
state.leaf_values[index_of_proof(&p)] =
Some(p.value.clone().into_boxed_slice());
state.leaf_values_num += 1;
// Upon receiving 2f + 1 matching READY(h)
// messages, wait for N 2 f ECHO messages,
// then decode v. Return the decoded v to ACS.
if state.ready_to_decode
&& state.leaf_values_num >= self.num_nodes - 2 * self.num_faulty_nodes
{
let value = decode_from_shards(
&mut state.leaf_values,
&self.coding,
self.data_shard_num,
h,
)?;
Ok((Some(value), no_outgoing))
} else if state.leaf_values_num >= self.num_nodes - self.num_faulty_nodes {
let result: Result<ProposedValue, Error> = decode_from_shards(
&mut state.leaf_values,
&self.coding,
self.data_shard_num,
h,
);
result?;
// if Ready has not yet been sent, multicast
// Ready
if !state.ready_sent {
state.ready_sent = true;
Ok((
None,
VecDeque::from(vec![TargetedBroadcastMessage {
target: BroadcastTarget::All,
message: BroadcastMessage::Ready(h.to_owned()),
}]),
))
} else {
Ok((None, no_outgoing))
}
} else {
Ok((None, no_outgoing))
}
} else {
debug!("Broadcast/{:?} cannot validate Echo {:?}", self.uid, p);
Ok((None, no_outgoing))
}
} else {
error!("Broadcast/{:?} root hash not initialised", self.uid);
Ok((None, no_outgoing))
}
}
BroadcastMessage::Ready(ref hash) => {
// Update the number Ready has been received with this hash.
*state.readys.entry(hash.to_vec()).or_insert(1) += 1;
// Check that the root hash matches.
if let Some(ref h) = state.root_hash.clone() {
let ready_num = *state.readys.get(h).unwrap_or(&0);
let mut outgoing = VecDeque::new();
// Upon receiving f + 1 matching Ready(h) messages, if Ready
// has not yet been sent, multicast Ready(h).
if (ready_num == self.num_faulty_nodes + 1) && !state.ready_sent {
// Enqueue a broadcast of a ready message.
outgoing.push_back(TargetedBroadcastMessage {
target: BroadcastTarget::All,
message: BroadcastMessage::Ready(h.to_vec()),
});
}
let mut output = None;
// Upon receiving 2f + 1 matching Ready(h) messages, wait
// for N 2f Echo messages, then decode v.
if ready_num > 2 * self.num_faulty_nodes {
// Wait for N - 2f Echo messages, then decode v.
if state.echo_num >= self.num_nodes - 2 * self.num_faulty_nodes {
let value = decode_from_shards(
&mut state.leaf_values,
&self.coding,
self.data_shard_num,
h,
)?;
if !state.has_output {
output = Some(value);
state.has_output = true;
}
} else {
state.ready_to_decode = true;
}
}
Ok((output, outgoing))
} else {
Ok((None, no_outgoing))
}
if let Some(ref h) = state.root_hash.clone() {
if p.validate(h.as_slice()) {
// Save the leaf value for reconstructing the tree
// later.
state.leaf_values[index_of_proof(&p)] = Some(p.value.clone().into_boxed_slice());
state.leaf_values_num += 1;
}
}
// Enqueue a broadcast of an echo of this proof.
let msgs = VecDeque::from(vec![TargetedBroadcastMessage {
target: BroadcastTarget::All,
message: BroadcastMessage::Echo(p.clone()),
}]);
let (output, echo_msgs) = self.handle_echo(p, state)?;
Ok((output, msgs.into_iter().chain(echo_msgs).collect()))
}
/// Handles a received echo and verifies the proof it contains.
fn handle_echo(
&self,
p: &Proof<ProposedValue>,
mut state: RwLockWriteGuard<BroadcastState>,
) -> Result<(Option<ProposedValue>, MessageQueue<NodeUid>), Error> {
if state.root_hash.is_none() {
state.root_hash = Some(p.root_hash.clone());
debug!("Node {:?} Echo root hash {:?}", self.uid, state.root_hash);
}
// Call validate with the root hash as argument.
let h = if let Some(h) = state.root_hash.clone() {
h
} else {
error!("Broadcast/{:?} root hash not initialised", self.uid);
return Ok((None, VecDeque::new()));
};
if !p.validate(h.as_slice()) {
debug!("Broadcast/{:?} cannot validate Echo {:?}", self.uid, p);
return Ok((None, VecDeque::new()));
}
state.echo_num += 1;
// Save the leaf value for reconstructing the
// tree later.
state.leaf_values[index_of_proof(&p)] = Some(p.value.clone().into_boxed_slice());
state.leaf_values_num += 1;
// Upon receiving 2f + 1 matching READY(h)
// messages, wait for N 2 f ECHO messages,
// then decode v. Return the decoded v to ACS.
if state.leaf_values_num < self.num_nodes - self.num_faulty_nodes {
return Ok((None, VecDeque::new()));
}
let value = decode_from_shards(
&mut state.leaf_values,
&self.coding,
self.data_shard_num,
&h,
)?;
if state.ready_to_decode
&& state.leaf_values_num >= self.num_nodes - 2 * self.num_faulty_nodes
{
return Ok((Some(value), VecDeque::new()));
}
// if Ready has not yet been sent, multicast Ready
if state.ready_sent {
return Ok((None, VecDeque::new()));
}
state.ready_sent = true;
let msg = TargetedBroadcastMessage {
target: BroadcastTarget::All,
message: BroadcastMessage::Ready(h.to_owned()),
};
let (output, ready_msgs) = self.handle_ready(&h, state)?;
Ok((output, iter::once(msg).chain(ready_msgs).collect()))
}
fn handle_ready(
&self,
hash: &[u8],
mut state: RwLockWriteGuard<BroadcastState>,
) -> Result<(Option<ProposedValue>, MessageQueue<NodeUid>), Error> {
// Update the number Ready has been received with this hash.
*state.readys.entry(hash.to_vec()).or_insert(1) += 1;
// Check that the root hash matches.
let h = if let Some(h) = state.root_hash.clone() {
h
} else {
return Ok((None, VecDeque::new()));
};
let ready_num = *state.readys.get(&h).unwrap_or(&0);
let mut outgoing = VecDeque::new();
// Upon receiving f + 1 matching Ready(h) messages, if Ready
// has not yet been sent, multicast Ready(h).
if (ready_num == self.num_faulty_nodes + 1) && !state.ready_sent {
// Enqueue a broadcast of a ready message.
outgoing.push_back(TargetedBroadcastMessage {
target: BroadcastTarget::All,
message: BroadcastMessage::Ready(h.to_vec()),
});
}
let mut output = None;
// Upon receiving 2f + 1 matching Ready(h) messages, wait
// for N 2f Echo messages, then decode v.
if ready_num > 2 * self.num_faulty_nodes {
// Wait for N - 2f Echo messages, then decode v.
if state.echo_num >= self.num_nodes - 2 * self.num_faulty_nodes {
let value = decode_from_shards(
&mut state.leaf_values,
&self.coding,
self.data_shard_num,
&h,
)?;
if !state.has_output {
output = Some(value);
state.has_output = true;
}
} else {
state.ready_to_decode = true;
}
}
Ok((output, outgoing))
}
}

View File

@ -161,11 +161,9 @@ impl<A: Adversary> TestNetwork<A> {
ref message,
} => {
for node in self.nodes.values_mut() {
// TODO: `Broadcast` currently assumes that messages to `All` also reach
// ourselves.
//if node.id != sender_id {
node.queue.push_back((sender_id, message.clone()))
//}
if node.id != sender_id {
node.queue.push_back((sender_id, message.clone()))
}
}
self.adversary.push_message(sender_id, msg.clone());
}