ported sharding and sending out Merkle tree proofs

This commit is contained in:
Vladimir Komendantskiy 2018-04-27 13:19:39 +01:00
parent 3ae0984733
commit 329adc3c2b
2 changed files with 133 additions and 13 deletions

View File

@ -1,5 +1,5 @@
//! Reliable broadcast algorithm instance.
use std::collections::{HashMap, VecDeque};
use std::collections::{HashMap, HashSet, VecDeque};
use std::fmt::Debug;
use std::hash::Hash;
use std::sync::{Arc, Mutex, RwLock};
@ -28,18 +28,26 @@ struct BroadcastState {
ready_to_decode: bool,
}
/// Reliable Broadcast algorithm instance.
pub struct Broadcast {
/// The UID of this node.
uid: NodeUid,
/// UIDs of all nodes for iteration purposes.
all_uids: HashSet<NodeUid>,
num_nodes: usize,
num_faulty_nodes: usize,
data_shard_num: usize,
coding: ReedSolomon,
/// Mutable state
/// All the mutable state is confined to the `state` field. This allows to
/// mutate state even when the broadcast instance is referred to by an
/// immutable reference.
state: RwLock<BroadcastState>
}
impl Broadcast {
pub fn new(uid: NodeUid, num_nodes: usize) -> Result<Self, Error> {
pub fn new(uid: NodeUid, all_uids: HashSet<NodeUid>, num_nodes: usize) ->
Result<Self, Error>
{
let num_faulty_nodes = (num_nodes - 1) / 3;
let parity_shard_num = 2 * num_faulty_nodes;
let data_shard_num = num_nodes - parity_shard_num;
@ -47,6 +55,7 @@ impl Broadcast {
Ok(Broadcast {
uid,
all_uids,
num_nodes,
num_faulty_nodes: num_faulty_nodes,
data_shard_num: data_shard_num,
@ -63,8 +72,8 @@ impl Broadcast {
})
}
// The message-driven interface function for calls from the main message
// loop.
/// The message-driven interface function for calls from the main message
/// loop.
pub fn on_message<E>(&self, m: QMessage, tx: Sender<QMessage>) ->
Result<MessageLoopState, E>
where E: From<Error> + From<messaging::Error>
@ -75,7 +84,7 @@ impl Broadcast {
}) => {
match message {
AlgoMessage::BroadcastInput(value) => {
Err(Error::NotImplemented).map_err(E::from)
self.on_local_message(&mut value.to_owned(), tx)
}
_ => Err(Error::UnexpectedMessage).map_err(E::from)
@ -87,7 +96,7 @@ impl Broadcast {
message
}) => {
if let Message::Broadcast(b) = message {
self.on_remote_message(uid, b, tx)
self.on_remote_message(uid, &b, tx)
}
else {
Err(Error::UnexpectedMessage).map_err(E::from)
@ -97,9 +106,117 @@ impl Broadcast {
_ => Err(Error::UnexpectedMessage).map_err(E::from)
}}
/// Processes the proposed value input by broadcasting it.
fn on_local_message<E>(&self, value: &mut ProposedValue,
tx: Sender<QMessage>) ->
Result<MessageLoopState, E>
where E: From<Error> + From<messaging::Error>
{
let mut state = self.state.write().unwrap();
// Split the value into chunks/shards, encode them with erasure codes.
// Assemble a Merkle tree from data and parity shards. Take all proofs
// from this tree and send them, each to its own node.
self.send_shards(value, tx)
.map(|(proof, remote_messages)| {
// Record the first proof as if it were sent by the node to
// itself.
let h = proof.root_hash.clone();
if proof.validate(h.as_slice()) {
// Save the leaf value for reconstructing the tree later.
state.leaf_values[index_of_proof(&proof)] =
Some(proof.value.clone().into_boxed_slice());
state.leaf_values_num += 1;
state.root_hash = Some(h);
}
MessageLoopState::Processing(remote_messages)
})
}
/// Breaks the input value into shards of equal length and encodes them --
/// and some extra parity shards -- with a Reed-Solomon erasure coding
/// scheme. The returned value contains the shard assigned to this
/// node. That shard doesn't need to be sent anywhere. It gets recorded in
/// the broadcast instance.
fn send_shards<E>(&self, value: &mut ProposedValue, tx: Sender<QMessage>) ->
Result<(Proof<ProposedValue>, VecDeque<RemoteMessage>), E>
where E: From<Error> + From<messaging::Error>
{
let data_shard_num = self.coding.data_shard_count();
let parity_shard_num = self.coding.parity_shard_count();
debug!("Data shards: {}, parity shards: {}",
self.data_shard_num, parity_shard_num);
// Insert the length of `v` so it can be decoded without the padding.
let payload_len = value.len() as u8;
value.insert(0, payload_len); // TODO: Handle messages larger than 255
// bytes.
let value_len = value.len();
// Size of a Merkle tree leaf value, in bytes.
let shard_len = if value_len % data_shard_num > 0 {
value_len / data_shard_num + 1
}
else {
value_len / data_shard_num
};
// Pad the last data shard with zeros. Fill the parity shards with
// zeros.
value.resize(shard_len * (data_shard_num + parity_shard_num), 0);
debug!("value_len {}, shard_len {}", value_len, shard_len);
// Divide the vector into chunks/shards.
let shards_iter = value.chunks_mut(shard_len);
// Convert the iterator over slices into a vector of slices.
let mut shards: Vec<&mut [u8]> = Vec::new();
for s in shards_iter {
shards.push(s);
}
debug!("Shards before encoding: {:?}", shards);
// Construct the parity chunks/shards
self.coding.encode(shards.as_mut_slice()).map_err(Error::from)?;
debug!("Shards: {:?}", shards);
let shards_t: Vec<ProposedValue> =
shards.into_iter().map(|s| s.to_vec()).collect();
// Convert the Merkle tree into a partial binary tree for later
// deconstruction into compound branches.
let mtree = MerkleTree::from_vec(&::ring::digest::SHA256, shards_t);
// Default result in case of `gen_proof` error.
let mut result = Err(Error::ProofConstructionFailed);
let mut outgoing = VecDeque::new();
// Send each proof to a node.
for (leaf_value, uid) in mtree.iter().zip(self.all_uids.clone()) {
let proof = mtree.gen_proof(leaf_value.to_vec());
if let Some(proof) = proof {
if uid == self.uid {
// The proof is addressed to this node.
result = Ok(proof);
}
else {
// Rest of the proofs are sent to remote nodes.
outgoing.push_back(
RemoteMessage {
node: RemoteNode::Node(uid),
message: Message::Broadcast(
BroadcastMessage::Value(proof))
});
}
}
}
result.map(|r| (r, outgoing)).map_err(E::from)
}
/// Handler of messages received from remote nodes.
fn on_remote_message<E>(&self, uid: NodeUid,
message: BroadcastMessage<ProposedValue>,
message: &BroadcastMessage<ProposedValue>,
tx: Sender<QMessage>) ->
Result<MessageLoopState, E>
where E: From<Error> + From<messaging::Error>
@ -137,7 +254,7 @@ impl Broadcast {
vec![RemoteMessage {
node: RemoteNode::All,
message: Message::Broadcast(
BroadcastMessage::Echo(p))
BroadcastMessage::Echo(p.clone()))
}]
)))
}

View File

@ -158,15 +158,18 @@ fn create_test_nodes(num_nodes: usize,
txs.insert(addr, net.tx(n, m));
rxs.insert(addr, net.rx(m, n));
}
let uid = node_addr(n);
let all_uids: HashSet<NodeUid> =
(0..num_nodes).into_iter().map(|k| node_addr(k)).collect();
let all_uids_copy = all_uids.clone();
// Create a broadcast algorithm instance for each node.
let mut broadcast_instances = HashMap::new();
for k in 0..num_nodes {
let them_uid = node_addr(k);
match Broadcast::new(them_uid, num_nodes) {
for uid in all_uids {
match Broadcast::new(uid, all_uids_copy.clone(), num_nodes) {
Ok(instance) => {
broadcast_instances.insert(them_uid, instance);
broadcast_instances.insert(uid, instance);
},
Err(e) => {
panic!("{:?}", e);