Make Broadcast handle large payloads.

This commit is contained in:
Andreas Fackler 2018-05-30 15:33:33 +02:00
parent 2bd1269366
commit c91fa13213
1 changed files with 21 additions and 19 deletions

View File

@ -3,12 +3,13 @@ use std::fmt::{self, Debug};
use std::iter::once;
use std::rc::Rc;
use fmt::{HexBytes, HexList, HexProof};
use byteorder::{BigEndian, ByteOrder};
use merkle::{MerkleTree, Proof};
use reed_solomon_erasure as rse;
use reed_solomon_erasure::ReedSolomon;
use ring::digest;
use fmt::{HexBytes, HexList, HexProof};
use messaging::{DistAlgorithm, NetworkInfo, Target, TargetedMessage};
error_chain!{
@ -131,8 +132,6 @@ impl<N: Eq + Debug + Clone + Ord> DistAlgorithm for Broadcast<N> {
// from this tree and send them, each to its own node.
let proof = self.send_shards(input)?;
let our_uid = &self.netinfo.our_uid().clone();
// TODO: We'd actually need to return the output here, if it was only one node. Should that
// use-case be supported?
self.handle_value(our_uid, proof)
}
@ -201,8 +200,9 @@ impl<N: Eq + Debug + Clone + Ord> Broadcast<N> {
self.data_shard_num, parity_shard_num
);
// Insert the length of `v` so it can be decoded without the padding.
let payload_len = value.len() as u8;
value.insert(0, payload_len); // TODO: Handle messages larger than 255 bytes.
let payload_len = value.len() as u32;
value.splice(0..0, 0..4); // Insert four bytes at the beginning.
BigEndian::write_u32(&mut value[..4], payload_len); // Write the size.
let value_len = value.len();
// Size of a Merkle tree leaf value, in bytes.
let shard_len = if value_len % data_shard_num > 0 {
@ -224,7 +224,9 @@ impl<N: Eq + Debug + Clone + Ord> Broadcast<N> {
debug!("Shards before encoding: {:?}", HexList(&shards));
// Construct the parity chunks/shards
self.coding.encode(&mut shards)?;
self.coding
.encode(&mut shards)
.expect("the size and number of shards is correct");
debug!("Shards: {:?}", HexList(&shards));
@ -365,7 +367,6 @@ impl<N: Eq + Debug + Clone + Ord> Broadcast<N> {
}
// Upon receiving 2f + 1 matching Ready(h) messages, wait for N 2f Echo messages.
self.decided = true;
let mut leaf_values: Vec<Option<Box<[u8]>>> = self
.netinfo
.all_uids()
@ -380,8 +381,9 @@ impl<N: Eq + Debug + Clone + Ord> Broadcast<N> {
})
})
.collect();
let value = decode_from_shards(&mut leaf_values, &self.coding, self.data_shard_num, hash)?;
self.output = Some(value);
let value = decode_from_shards(&mut leaf_values, &self.coding, self.data_shard_num, hash);
self.decided = value.is_some();
self.output = value;
Ok(())
}
@ -494,12 +496,14 @@ fn decode_from_shards<T>(
coding: &Coding,
data_shard_num: usize,
root_hash: &[u8],
) -> BroadcastResult<T>
) -> Option<T>
where
T: From<Vec<u8>>,
{
// Try to interpolate the Merkle tree using the Reed-Solomon erasure coding scheme.
coding.reconstruct_shards(leaf_values)?;
coding
.reconstruct_shards(leaf_values)
.expect("enough shards are present");
// Recompute the Merkle tree root.
@ -516,13 +520,10 @@ where
// If the root hash of the reconstructed tree does not match the one
// received with proofs then abort.
if &mtree.root_hash()[..] != root_hash {
// NOTE: The paper does not define the meaning of *abort*. But it is
// sensible not to continue trying to reconstruct the tree after this
// point. This instance must have received incorrect shards.
Err(ErrorKind::RootHashMismatch.into())
None // The proposer is faulty.
} else {
// Reconstruct the value from the data shards.
Ok(glue_shards(mtree, data_shard_num))
Some(glue_shards(mtree, data_shard_num))
}
}
@ -537,8 +538,9 @@ where
.take(n)
.flat_map(|s| s.into_iter().skip(1)) // Drop the index byte.
.collect();
let payload_len = t[0] as usize;
debug!("Glued data shards {:?}", HexBytes(&t[1..(payload_len + 1)]));
let payload_len = BigEndian::read_u32(&t[..4]) as usize;
let payload = &t[4..(payload_len + 4)];
debug!("Glued data shards {:?}", HexBytes(payload));
t[1..(payload_len + 1)].to_vec().into()
payload.to_vec().into()
}