decoder of broadcast value from Merkle tree leaves

This commit is contained in:
Vladimir Komendantskiy 2018-03-29 17:23:02 +01:00
parent 468cf90076
commit d8c57280d3
3 changed files with 118 additions and 57 deletions

View File

@ -1,10 +1,8 @@
//! Reliable broadcast algorithm.
use std::fmt::Debug;
use std::hash::Hash;
use std::collections::{HashSet, HashMap};
use std::sync::{Arc, Mutex};
use std::sync::mpsc;
//use std::rc::Rc;
use spmc;
use crossbeam;
use proto::*;
@ -62,7 +60,7 @@ where Vec<u8>: From<T>
/// and an error in case of failure.
///
/// TODO: Detailed error status.
pub fn run(&mut self) -> Result<T, ()> {
pub fn run(&mut self) -> Result<T, BroadcastError> {
// Broadcast state machine thread.
//
// rx cannot be cloned due to its type constraint but can be used inside
@ -70,24 +68,31 @@ where Vec<u8>: From<T>
// reason). A `Mutex` is used to grant write access.
let rx = self.rx.to_owned();
let tx = self.tx.to_owned();
let final_value: Option<T> = None;
let final_value_r = Arc::new(Mutex::new(None));
let final_value: Option<Result<T, BroadcastError>> = None;
let final_value_r = Arc::new(Mutex::new(final_value.clone()));
let final_value_r_scoped = final_value_r.clone();
let bvalue = self.broadcast_value.to_owned();
crossbeam::scope(|scope| {
scope.spawn(move || {
*final_value_r.lock().unwrap() =
inner_run(tx, rx, bvalue);
*final_value_r_scoped.lock().unwrap() =
Some(inner_run(tx, rx, bvalue));
});
});
match final_value {
None => Err(()),
Some(v) => Ok(v)
Some(ref r) => r.clone(),
None => Err(BroadcastError::Threading)
}
}
}
/// Errors returned by the broadcast instance.
#[derive(Debug, Clone, PartialEq)]
pub enum BroadcastError {
RootHashMismatch,
Threading
}
/// Breaks the input value into shards of equal length and encodes them -- and
/// some extra parity shards -- with a Reed-Solomon erasure coding scheme.
fn send_shards<T>(value: T,
@ -151,7 +156,7 @@ where T: Clone + Debug + Send + Sync + Into<Vec<u8>>
/// The main loop of the broadcast task.
fn inner_run<T>(tx: Arc<Mutex<spmc::Sender<Message<T>>>>,
rx: Arc<Mutex<mpsc::Receiver<Message<T>>>>,
broadcast_value: Option<T>) -> Option<T>
broadcast_value: Option<T>) -> Result<T, BroadcastError>
where T: Clone + Debug + Eq + Hash + Send + Sync + Into<Vec<u8>>
+ From<Vec<u8>> + AsRef<[u8]>
, Vec<u8>: From<T>
@ -171,12 +176,11 @@ where T: Clone + Debug + Eq + Hash + Send + Sync + Into<Vec<u8>>
}
// currently known leaf values
let mut leaf_values: Vec<Option<Box<[u8]>>> =
vec![None; PLACEHOLDER_N];
let mut leaf_values: Vec<Option<Box<[u8]>>> = vec![None; PLACEHOLDER_N];
// number of non-None leaf values
let mut leaf_values_num = 0;
// return value
let reconstructed_value: Option<T> = None;
let mut result: Option<Result<T, BroadcastError>> = None;
// Write-once root hash of a tree broadcast from the sender associated with
// this instance.
let mut root_hash: Option<Vec<u8>> = None;
@ -185,9 +189,10 @@ where T: Clone + Debug + Eq + Hash + Send + Sync + Into<Vec<u8>>
// Number of times Ready was received with the same root hash.
let mut ready_num = 0;
let mut ready_sent = false;
let mut ready_to_decode = false;
// TODO: handle exit conditions
while reconstructed_value == None {
while result == None {
// Receive a message from the socket IO task.
let message = rx.lock().unwrap().recv().unwrap();
if let Message::Broadcast(message) = message {
@ -235,42 +240,33 @@ where T: Clone + Debug + Eq + Hash + Send + Sync + Into<Vec<u8>>
.into_boxed_slice());
leaf_values_num = leaf_values_num + 1;
if leaf_values_num >= PLACEHOLDER_N - PLACEHOLDER_F {
// Try to interpolate the Merkle tree using the
// Reed-Solomon erasure coding scheme.
coding.reconstruct_shards(leaf_values
.as_mut_slice())
.unwrap();
// Recompute the Merkle tree root.
//
// Convert shards back to type `T` for tree
// construction.
let mut shards_t: Vec<T> = Vec::new();
for l in leaf_values.iter() {
if let Some(ref v) = *l {
let s = Vec::into(v.to_vec());
shards_t.push(s);
}
}
// Construct the Merkle tree.
let mtree = MerkleTree::from_vec(
&::ring::digest::SHA256, shards_t);
// If the root hash of the reconstructed tree
// does not match the one received with proofs
// then abort.
if *mtree.root_hash() != *h {
break;
}
// upon receiving 2f + 1 matching READY(h)
// messages, wait for N 2 f ECHO messages, then
// decode v
if ready_to_decode &&
leaf_values_num >=
PLACEHOLDER_N - 2 * PLACEHOLDER_F
{
result = Some(
decode_from_shards(&mut leaf_values,
&coding,
data_shard_num, h));
}
// if Ready has not yet been sent, multicast Ready
if !ready_sent {
ready_sent = true;
tx.lock().unwrap().send(Message::Broadcast(
BroadcastMessage::Ready(h.to_owned())))
.unwrap();
else if leaf_values_num >=
PLACEHOLDER_N - PLACEHOLDER_F
{
result = Some(
decode_from_shards(&mut leaf_values,
&coding,
data_shard_num, h));
// if Ready has not yet been sent, multicast
// Ready
if !ready_sent {
ready_sent = true;
tx.lock().unwrap().send(Message::Broadcast(
BroadcastMessage::Ready(h.to_owned())))
.unwrap();
}
}
}
}
@ -298,11 +294,17 @@ where T: Clone + Debug + Eq + Hash + Send + Sync + Into<Vec<u8>>
// Upon receiving 2f + 1 matching Ready(h) messages,
// wait for N 2f Echo messages, then decode v.
if (ready_num > 2 * PLACEHOLDER_F) &&
(reconstructed_value == None) &&
(echo_num >= PLACEHOLDER_N - 2 * PLACEHOLDER_F)
{
// FIXME: decode v
if ready_num > 2 * PLACEHOLDER_F {
// Wait for N - 2f Echo messages, then decode v.
if echo_num >= PLACEHOLDER_N - 2 * PLACEHOLDER_F {
result = Some(
decode_from_shards(&mut leaf_values,
&coding,
data_shard_num, h));
}
else {
ready_to_decode = true;
}
}
}
}
@ -313,7 +315,66 @@ where T: Clone + Debug + Eq + Hash + Send + Sync + Into<Vec<u8>>
message);
}
}
return reconstructed_value;
result.unwrap()
}
fn decode_from_shards<T>(leaf_values: &mut Vec<Option<Box<[u8]>>>,
coding: &ReedSolomon,
data_shard_num: usize,
root_hash: &Vec<u8>) ->
Result<T, BroadcastError>
where T: AsRef<[u8]> + From<Vec<u8>>, Vec<u8>: From<T>
{
// Try to interpolate the Merkle tree using the Reed-Solomon erasure coding
// scheme.
coding.reconstruct_shards(leaf_values.as_mut_slice()).unwrap();
// Recompute the Merkle tree root.
//
// Convert shards back to type `T` for tree construction.
let mut shards_t: Vec<T> = Vec::new();
for l in leaf_values.iter() {
if let Some(ref v) = *l {
let s = Vec::into(v.to_vec());
shards_t.push(s);
}
}
// Construct the Merkle tree.
let mtree = MerkleTree::from_vec(&::ring::digest::SHA256, shards_t);
// If the root hash of the reconstructed tree does not match the one
// received with proofs then abort.
if *mtree.root_hash() != *root_hash {
// NOTE: The paper does not define the meaning of *abort*. But it is
// sensible not to continue trying to reconstruct the tree after this
// point. This instance must have received incorrect shards.
Err(BroadcastError::RootHashMismatch)
}
else {
// Reconstruct the value from the data shards.
Ok(glue_shards(mtree, data_shard_num))
}
}
/// Concatenates the first `n` leaf values of a Merkle tree `m` in one value of
/// type `T`. This is useful for reconstructing the data value held in the tree
/// and forgetting the leaves that contain parity information.
fn glue_shards<T>(m: MerkleTree<T>, n: usize) -> T
where T: From<Vec<u8>>, Vec<u8>: From<T>
{
let mut t: Vec<u8> = Vec::new();
let mut i = 0;
for s in m.into_iter() {
i += 1;
if i > n {
break;
}
for b in Vec::from(s).into_iter() {
t.push(b);
}
}
Vec::into(t)
}
/// An additional path conversion operation on `Lemma` to allow reconstruction

View File

@ -34,6 +34,7 @@
//! the consensus `result` is not an error then every successfully terminated
//! consensus node will be the same `result`.
#![feature(optin_builtin_traits)]
#[macro_use]
extern crate error_chain;
#[macro_use]

View File

@ -1,5 +1,4 @@
//! Construction of messages from protobuf buffers.
#![feature(optin_builtin_traits)]
pub mod message;
use std::marker::{Send, Sync};