made the sharding step a function to reduce the size of inner_run

This commit is contained in:
Vladimir Komendantskiy 2018-03-28 14:51:58 +01:00
parent 264b72011c
commit 182c6e65f5
1 changed files with 61 additions and 47 deletions

View File

@ -100,6 +100,66 @@ where Vec<u8>: From<T>
}
}
/// Breaks the input value into shards of equal length and encodes them -- and
/// some extra parity shards -- with a Reed-Solomon erasure coding scheme.
fn send_shards<T>(value: T,
tx: Arc<Mutex<spmc::Sender<Message<T>>>>,
coding: &ReedSolomon,
data_shard_num: usize,
parity_shard_num: usize)
where T: Clone + Debug + Send + Sync + Into<Vec<u8>>
+ From<Vec<u8>> + AsRef<[u8]>
, Vec<u8>: From<T>
{
let mut v: Vec<u8> = Vec::from(value).to_owned();
// Pad the value vector with zeros to allow for shards of equal sizes.
let shard_pad_len = v.len() % data_shard_num;
for _i in 0..shard_pad_len {
v.push(0);
}
// Size of a Merkle tree leaf value, in bytes.
// Now the vector length is evenly divisible by the number of shards.
let shard_len = v.len() / data_shard_num;
// Pad the parity shards with zeros.
for _i in 0 .. shard_len * parity_shard_num {
v.push(0);
}
// Divide the vector into chunks/shards.
let shards_iter = v.chunks_mut(shard_len);
// Convert the iterator over slices into a vector of slices.
let mut shards: Vec<&mut [u8]> = Vec::new();
for s in shards_iter {
shards.push(s);
}
// Construct the parity chunks/shards
coding.encode(shards.as_mut_slice()).unwrap();
// Convert shards back to type `T` for proof generation.
let mut shards_t: Vec<T> = Vec::new();
for s in shards.iter() {
let s = Vec::into(s.to_vec());
shards_t.push(s);
}
// Convert the Merkle tree into a partial binary tree for later
// deconstruction into compound branches.
let mtree = MerkleTree::from_vec(&::ring::digest::SHA256, shards_t);
// Send each proof to a node.
//
// FIXME: use a single consumer TX channel.
for leaf_value in mtree.iter().cloned() {
let proof = mtree.gen_proof(leaf_value);
if let Some(proof) = proof {
tx.lock().unwrap().send(Message::Broadcast(
BroadcastMessage::Value(proof))).unwrap();
}
}
}
/// The main loop of the broadcast task.
fn inner_run<T>(tx: Arc<Mutex<spmc::Sender<Message<T>>>>,
rx: Arc<Mutex<mpsc::Receiver<Message<T>>>>,
@ -126,53 +186,7 @@ where T: Clone + Debug + Eq + Hash + Send + Sync + Into<Vec<u8>>
//
// FIXME: Does the node send a proof to itself?
if let Some(v) = broadcast_value {
let mut v: Vec<u8> = Vec::from(v).to_owned();
// Pad the value vector with zeros to allow for shards of equal sizes.
let shard_pad_len = v.len() % data_shard_num;
for _i in 0..shard_pad_len {
v.push(0);
}
// Size of a Merkle tree leaf value, in bytes.
// Now the vector length is evenly divisible by the number of shards.
let shard_len = v.len() / data_shard_num;
// Pad the parity shards with zeros.
for _i in 0 .. shard_len * parity_shard_num {
v.push(0);
}
// Divide the vector into chunks/shards.
let shards_iter = v.chunks_mut(shard_len);
// Convert the iterator over slices into a vector of slices.
let mut shards: Vec<&mut [u8]> = Vec::new();
for s in shards_iter {
shards.push(s);
}
// Construct the parity chunks/shards
coding.encode(shards.as_mut_slice()).unwrap();
// Convert shards back to type `T` for proof generation.
let mut shards_T: Vec<T> = Vec::new();
for s in shards.iter() {
let s = Vec::into(s.to_vec());
shards_T.push(s);
}
// Convert the Merkle tree into a partial binary tree for later
// deconstruction into compound branches.
let mtree = MerkleTree::from_vec(&::ring::digest::SHA256, shards_T);
// Send each proof to a node.
//
// FIXME: use a single consumer TX channel.
for leaf_value in mtree.iter().cloned() {
let proof = mtree.gen_proof(leaf_value);
if let Some(proof) = proof {
tx.lock().unwrap().send(Message::Broadcast(
BroadcastMessage::Value(proof))).unwrap();
}
}
send_shards(v, tx.clone(), &coding, data_shard_num, parity_shard_num);
}
// TODO: handle exit conditions