removed some unnecessary cloning

This also adds a few TODOs to the broadcast implementation: Some of
these issues need a regression test. (Oh, and a fix!)
This commit is contained in:
Andreas Fackler 2018-05-04 09:58:21 +02:00
parent cde3a879e5
commit 8bced81438
3 changed files with 62 additions and 72 deletions

View File

@ -94,7 +94,7 @@ impl Broadcast<messaging::NodeUid> {
message,
}) => {
if let Message::Broadcast(b) = message {
self.on_remote_message(uid, &b, tx)
self.on_remote_message(uid, b, tx)
} else {
Err(Error::UnexpectedMessage)
}
@ -107,7 +107,7 @@ impl Broadcast<messaging::NodeUid> {
fn on_remote_message(
&self,
uid: messaging::NodeUid,
message: &BroadcastMessage<ProposedValue>,
message: BroadcastMessage<ProposedValue>,
tx: &Sender<QMessage>,
) -> Result<MessageLoopState, Error> {
let (output, messages) = self.handle_broadcast_message(&uid, message)?;
@ -207,22 +207,17 @@ impl<NodeUid: Eq + Hash + Debug + Clone> Broadcast<NodeUid> {
// Split the value into chunks/shards, encode them with erasure codes.
// Assemble a Merkle tree from data and parity shards. Take all proofs
// from this tree and send them, each to its own node.
self.send_shards(value)
.map_err(Error::from)
.map(|(proof, remote_messages)| {
// Record the first proof as if it were sent by the node to
// itself.
let h = proof.root_hash.clone();
if proof.validate(h.as_slice()) {
// Save the leaf value for reconstructing the tree later.
state.leaf_values[index_of_proof(&proof)] =
Some(proof.value.clone().into_boxed_slice());
state.leaf_values_num += 1;
state.root_hash = Some(h);
}
self.send_shards(value).map(|(proof, remote_messages)| {
// Record the first proof as if it were sent by the node to itself.
let h = proof.root_hash.clone();
// Save the leaf value for reconstructing the tree later.
state.leaf_values[index_of_proof(&proof)] =
Some(proof.value.clone().into_boxed_slice());
state.leaf_values_num += 1;
state.root_hash = Some(h);
remote_messages
})
remote_messages
})
}
pub fn our_id(&self) -> &NodeUid {
@ -247,8 +242,7 @@ impl<NodeUid: Eq + Hash + Debug + Clone> Broadcast<NodeUid> {
);
// Insert the length of `v` so it can be decoded without the padding.
let payload_len = value.len() as u8;
value.insert(0, payload_len); // TODO: Handle messages larger than 255
// bytes.
value.insert(0, payload_len); // TODO: Handle messages larger than 255 bytes.
let value_len = value.len();
// Size of a Merkle tree leaf value, in bytes.
let shard_len = if value_len % data_shard_num > 0 {
@ -265,17 +259,12 @@ impl<NodeUid: Eq + Hash + Debug + Clone> Broadcast<NodeUid> {
// Divide the vector into chunks/shards.
let shards_iter = value.chunks_mut(shard_len);
// Convert the iterator over slices into a vector of slices.
let mut shards: Vec<&mut [u8]> = Vec::new();
for s in shards_iter {
shards.push(s);
}
let mut shards: Vec<&mut [u8]> = shards_iter.collect();
debug!("Shards before encoding: {:?}", shards);
// Construct the parity chunks/shards
self.coding
.encode(shards.as_mut_slice())
.map_err(Error::from)?;
self.coding.encode(&mut shards)?;
debug!("Shards: {:?}", shards);
@ -290,19 +279,21 @@ impl<NodeUid: Eq + Hash + Debug + Clone> Broadcast<NodeUid> {
let mut outgoing = VecDeque::new();
// Send each proof to a node.
// TODO: This generates the wrong proof if a leaf occurs more than once. Consider using the
// `merkle_light` crate instead.
for (leaf_value, uid) in mtree.iter().zip(self.all_uids.clone()) {
let proof = mtree.gen_proof(leaf_value.to_vec());
if let Some(proof) = proof {
if uid == self.our_id {
// The proof is addressed to this node.
result = Ok(proof);
} else {
// Rest of the proofs are sent to remote nodes.
outgoing.push_back(TargetedBroadcastMessage {
target: BroadcastTarget::Node(uid),
message: BroadcastMessage::Value(proof),
});
}
let proof = mtree
.gen_proof(leaf_value.to_vec())
.ok_or(Error::ProofConstructionFailed)?;
if uid == self.our_id {
// The proof is addressed to this node.
result = Ok(proof);
} else {
// Rest of the proofs are sent to remote nodes.
outgoing.push_back(TargetedBroadcastMessage {
target: BroadcastTarget::Node(uid),
message: BroadcastMessage::Value(proof),
});
}
}
@ -313,13 +304,13 @@ impl<NodeUid: Eq + Hash + Debug + Clone> Broadcast<NodeUid> {
pub fn handle_broadcast_message(
&self,
sender_id: &NodeUid,
message: &BroadcastMessage<ProposedValue>,
message: BroadcastMessage<ProposedValue>,
) -> Result<(Option<ProposedValue>, MessageQueue<NodeUid>), Error> {
let state = self.state.write().unwrap();
match message {
BroadcastMessage::Value(p) => self.handle_value(sender_id, p, state),
BroadcastMessage::Echo(p) => self.handle_echo(p, state),
BroadcastMessage::Ready(ref hash) => self.handle_ready(hash, state),
BroadcastMessage::Ready(hash) => self.handle_ready(hash, state),
}
}
@ -327,7 +318,7 @@ impl<NodeUid: Eq + Hash + Debug + Clone> Broadcast<NodeUid> {
fn handle_value(
&self,
sender_id: &NodeUid,
p: &Proof<ProposedValue>,
p: Proof<ProposedValue>,
mut state: RwLockWriteGuard<BroadcastState>,
) -> Result<(Option<ProposedValue>, MessageQueue<NodeUid>), Error> {
if *sender_id != self.proposer_id {
@ -343,13 +334,12 @@ impl<NodeUid: Eq + Hash + Debug + Clone> Broadcast<NodeUid> {
);
}
if let Some(ref h) = state.root_hash.clone() {
if p.validate(h.as_slice()) {
// Save the leaf value for reconstructing the tree
// later.
state.leaf_values[index_of_proof(&p)] = Some(p.value.clone().into_boxed_slice());
state.leaf_values_num += 1;
}
if state.root_hash.as_ref().map_or(false, |h| p.validate(h)) {
// TODO: Should messages failing this be echoed at all?
// Save the leaf value for reconstructing the tree later.
let idx = index_of_proof(&p);
state.leaf_values[idx] = Some(p.value.clone().into_boxed_slice());
state.leaf_values_num += 1;
}
// Enqueue a broadcast of an echo of this proof.
@ -364,7 +354,7 @@ impl<NodeUid: Eq + Hash + Debug + Clone> Broadcast<NodeUid> {
/// Handles a received echo and verifies the proof it contains.
fn handle_echo(
&self,
p: &Proof<ProposedValue>,
p: Proof<ProposedValue>,
mut state: RwLockWriteGuard<BroadcastState>,
) -> Result<(Option<ProposedValue>, MessageQueue<NodeUid>), Error> {
if state.root_hash.is_none() {
@ -389,9 +379,9 @@ impl<NodeUid: Eq + Hash + Debug + Clone> Broadcast<NodeUid> {
}
state.echo_num += 1;
// Save the leaf value for reconstructing the
// tree later.
state.leaf_values[index_of_proof(&p)] = Some(p.value.clone().into_boxed_slice());
// Save the leaf value for reconstructing the tree later.
let idx = index_of_proof(&p);
state.leaf_values[idx] = Some(p.value.into_boxed_slice());
state.leaf_values_num += 1;
// Upon receiving 2f + 1 matching READY(h)
@ -401,6 +391,7 @@ impl<NodeUid: Eq + Hash + Debug + Clone> Broadcast<NodeUid> {
return Ok((None, VecDeque::new()));
}
// TODO: Only decode once. Don't repeat for every ECHO message.
let value = decode_from_shards(
&mut state.leaf_values,
&self.coding,
@ -408,9 +399,8 @@ impl<NodeUid: Eq + Hash + Debug + Clone> Broadcast<NodeUid> {
&h,
)?;
if state.ready_to_decode
&& state.leaf_values_num >= self.num_nodes - 2 * self.num_faulty_nodes
{
if state.ready_to_decode && !state.has_output {
state.has_output = true;
return Ok((Some(value), VecDeque::new()));
}
@ -422,19 +412,20 @@ impl<NodeUid: Eq + Hash + Debug + Clone> Broadcast<NodeUid> {
state.ready_sent = true;
let msg = TargetedBroadcastMessage {
target: BroadcastTarget::All,
message: BroadcastMessage::Ready(h.to_owned()),
message: BroadcastMessage::Ready(h.clone()),
};
let (output, ready_msgs) = self.handle_ready(&h, state)?;
let (output, ready_msgs) = self.handle_ready(h, state)?;
Ok((output, iter::once(msg).chain(ready_msgs).collect()))
}
fn handle_ready(
&self,
hash: &[u8],
hash: Vec<u8>,
mut state: RwLockWriteGuard<BroadcastState>,
) -> Result<(Option<ProposedValue>, MessageQueue<NodeUid>), Error> {
// Update the number Ready has been received with this hash.
*state.readys.entry(hash.to_vec()).or_insert(1) += 1;
// TODO: Don't accept multiple ready messages from the same node.
*state.readys.entry(hash).or_insert(1) += 1;
// Check that the root hash matches.
let h = if let Some(h) = state.root_hash.clone() {
@ -862,7 +853,7 @@ where
}
fn decode_from_shards<T>(
leaf_values: &mut Vec<Option<Box<[u8]>>>,
leaf_values: &mut [Option<Box<[u8]>>],
coding: &ReedSolomon,
data_shard_num: usize,
root_hash: &[u8],
@ -870,19 +861,16 @@ fn decode_from_shards<T>(
where
T: Clone + Debug + Hashable + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>,
{
// Try to interpolate the Merkle tree using the Reed-Solomon erasure coding
// scheme.
coding.reconstruct_shards(leaf_values.as_mut_slice())?;
// Try to interpolate the Merkle tree using the Reed-Solomon erasure coding scheme.
coding.reconstruct_shards(leaf_values)?;
// Recompute the Merkle tree root.
//
// Collect shards for tree construction.
let mut shards: Vec<ProposedValue> = Vec::new();
for l in leaf_values.iter() {
if let Some(ref v) = *l {
shards.push(v.to_vec());
}
}
let shards: Vec<ProposedValue> = leaf_values
.iter()
.filter_map(|l| l.as_ref().map(|v| v.to_vec()))
.collect();
// Construct the Merkle tree.
let mtree = MerkleTree::from_vec(&::ring::digest::SHA256, shards);
// If the root hash of the reconstructed tree does not match the one
@ -959,6 +947,8 @@ fn index_of_path(mut path: Vec<bool>) -> usize {
}
/// Computes the Merkle tree leaf index of a value in a given proof.
// TODO: This currently only works if the number of leaves is a power of two. With the
// `merkle_light` crate, it might not even be needed, though.
fn index_of_proof(p: &Proof<ProposedValue>) -> usize {
index_of_path(path_of_lemma(&p.lemma))
}

View File

@ -120,7 +120,7 @@ impl<NodeUid: Clone + Debug + Display + Eq + Hash + Ord> CommonSubset<NodeUid> {
let input_result = {
if let Some(broadcast_instance) = self.broadcast_instances.get(&uid) {
broadcast_instance
.handle_broadcast_message(&uid, &bmessage)
.handle_broadcast_message(&uid, bmessage)
.map(|(value, queue)| {
instance_result = value;
queue.into_iter().map(Output::Broadcast).collect()

View File

@ -50,7 +50,7 @@ impl TestNode {
let (from_id, msg) = self.queue.pop_front().expect("message not found");
debug!("Handling {:?} -> {:?}: {:?}", from_id, self.id, msg);
let (output, msgs) = self.broadcast
.handle_broadcast_message(&from_id, &msg)
.handle_broadcast_message(&from_id, msg)
.expect("handling message");
if let Some(output) = output.clone() {
self.outputs.push(output);