From d8c57280d34e8991711ce572618b2c51540e10c3 Mon Sep 17 00:00:00 2001 From: Vladimir Komendantskiy Date: Thu, 29 Mar 2018 17:23:02 +0100 Subject: [PATCH] decoder of broadcast value from Merkle tree leaves --- src/broadcast/mod.rs | 173 +++++++++++++++++++++++++++++-------------- src/lib.rs | 1 + src/proto/mod.rs | 1 - 3 files changed, 118 insertions(+), 57 deletions(-) diff --git a/src/broadcast/mod.rs b/src/broadcast/mod.rs index 779b285..bbe3ef7 100644 --- a/src/broadcast/mod.rs +++ b/src/broadcast/mod.rs @@ -1,10 +1,8 @@ //! Reliable broadcast algorithm. use std::fmt::Debug; use std::hash::Hash; -use std::collections::{HashSet, HashMap}; use std::sync::{Arc, Mutex}; use std::sync::mpsc; -//use std::rc::Rc; use spmc; use crossbeam; use proto::*; @@ -62,7 +60,7 @@ where Vec: From /// and an error in case of failure. /// /// TODO: Detailed error status. - pub fn run(&mut self) -> Result { + pub fn run(&mut self) -> Result { // Broadcast state machine thread. // // rx cannot be cloned due to its type constraint but can be used inside @@ -70,24 +68,31 @@ where Vec: From // reason). A `Mutex` is used to grant write access. let rx = self.rx.to_owned(); let tx = self.tx.to_owned(); - let final_value: Option = None; - let final_value_r = Arc::new(Mutex::new(None)); + let final_value: Option> = None; + let final_value_r = Arc::new(Mutex::new(final_value.clone())); + let final_value_r_scoped = final_value_r.clone(); let bvalue = self.broadcast_value.to_owned(); crossbeam::scope(|scope| { scope.spawn(move || { - *final_value_r.lock().unwrap() = - inner_run(tx, rx, bvalue); + *final_value_r_scoped.lock().unwrap() = + Some(inner_run(tx, rx, bvalue)); }); }); - match final_value { - None => Err(()), - Some(v) => Ok(v) + Some(ref r) => r.clone(), + None => Err(BroadcastError::Threading) } } } +/// Errors returned by the broadcast instance. +#[derive(Debug, Clone, PartialEq)] +pub enum BroadcastError { + RootHashMismatch, + Threading +} + /// Breaks the input value into shards of equal length and encodes them -- and /// some extra parity shards -- with a Reed-Solomon erasure coding scheme. fn send_shards(value: T, @@ -151,7 +156,7 @@ where T: Clone + Debug + Send + Sync + Into> /// The main loop of the broadcast task. fn inner_run(tx: Arc>>>, rx: Arc>>>, - broadcast_value: Option) -> Option + broadcast_value: Option) -> Result where T: Clone + Debug + Eq + Hash + Send + Sync + Into> + From> + AsRef<[u8]> , Vec: From @@ -171,12 +176,11 @@ where T: Clone + Debug + Eq + Hash + Send + Sync + Into> } // currently known leaf values - let mut leaf_values: Vec>> = - vec![None; PLACEHOLDER_N]; + let mut leaf_values: Vec>> = vec![None; PLACEHOLDER_N]; // number of non-None leaf values let mut leaf_values_num = 0; // return value - let reconstructed_value: Option = None; + let mut result: Option> = None; // Write-once root hash of a tree broadcast from the sender associated with // this instance. let mut root_hash: Option> = None; @@ -185,9 +189,10 @@ where T: Clone + Debug + Eq + Hash + Send + Sync + Into> // Number of times Ready was received with the same root hash. let mut ready_num = 0; let mut ready_sent = false; + let mut ready_to_decode = false; // TODO: handle exit conditions - while reconstructed_value == None { + while result == None { // Receive a message from the socket IO task. let message = rx.lock().unwrap().recv().unwrap(); if let Message::Broadcast(message) = message { @@ -235,42 +240,33 @@ where T: Clone + Debug + Eq + Hash + Send + Sync + Into> .into_boxed_slice()); leaf_values_num = leaf_values_num + 1; - if leaf_values_num >= PLACEHOLDER_N - PLACEHOLDER_F { - // Try to interpolate the Merkle tree using the - // Reed-Solomon erasure coding scheme. - - coding.reconstruct_shards(leaf_values - .as_mut_slice()) - .unwrap(); - - // Recompute the Merkle tree root. - // - // Convert shards back to type `T` for tree - // construction. - let mut shards_t: Vec = Vec::new(); - for l in leaf_values.iter() { - if let Some(ref v) = *l { - let s = Vec::into(v.to_vec()); - shards_t.push(s); - } - } - // Construct the Merkle tree. - let mtree = MerkleTree::from_vec( - &::ring::digest::SHA256, shards_t); - // If the root hash of the reconstructed tree - // does not match the one received with proofs - // then abort. - if *mtree.root_hash() != *h { - break; - } + // upon receiving 2f + 1 matching READY(h) + // messages, wait for N − 2 f ECHO messages, then + // decode v + if ready_to_decode && + leaf_values_num >= + PLACEHOLDER_N - 2 * PLACEHOLDER_F + { + result = Some( + decode_from_shards(&mut leaf_values, + &coding, + data_shard_num, h)); } - - // if Ready has not yet been sent, multicast Ready - if !ready_sent { - ready_sent = true; - tx.lock().unwrap().send(Message::Broadcast( - BroadcastMessage::Ready(h.to_owned()))) - .unwrap(); + else if leaf_values_num >= + PLACEHOLDER_N - PLACEHOLDER_F + { + result = Some( + decode_from_shards(&mut leaf_values, + &coding, + data_shard_num, h)); + // if Ready has not yet been sent, multicast + // Ready + if !ready_sent { + ready_sent = true; + tx.lock().unwrap().send(Message::Broadcast( + BroadcastMessage::Ready(h.to_owned()))) + .unwrap(); + } } } } @@ -298,11 +294,17 @@ where T: Clone + Debug + Eq + Hash + Send + Sync + Into> // Upon receiving 2f + 1 matching Ready(h) messages, // wait for N − 2f Echo messages, then decode v. - if (ready_num > 2 * PLACEHOLDER_F) && - (reconstructed_value == None) && - (echo_num >= PLACEHOLDER_N - 2 * PLACEHOLDER_F) - { - // FIXME: decode v + if ready_num > 2 * PLACEHOLDER_F { + // Wait for N - 2f Echo messages, then decode v. + if echo_num >= PLACEHOLDER_N - 2 * PLACEHOLDER_F { + result = Some( + decode_from_shards(&mut leaf_values, + &coding, + data_shard_num, h)); + } + else { + ready_to_decode = true; + } } } } @@ -313,7 +315,66 @@ where T: Clone + Debug + Eq + Hash + Send + Sync + Into> message); } } - return reconstructed_value; + result.unwrap() +} + +fn decode_from_shards(leaf_values: &mut Vec>>, + coding: &ReedSolomon, + data_shard_num: usize, + root_hash: &Vec) -> + Result +where T: AsRef<[u8]> + From>, Vec: From +{ + // Try to interpolate the Merkle tree using the Reed-Solomon erasure coding + // scheme. + coding.reconstruct_shards(leaf_values.as_mut_slice()).unwrap(); + + // Recompute the Merkle tree root. + // + // Convert shards back to type `T` for tree construction. + let mut shards_t: Vec = Vec::new(); + for l in leaf_values.iter() { + if let Some(ref v) = *l { + let s = Vec::into(v.to_vec()); + shards_t.push(s); + } + } + // Construct the Merkle tree. + let mtree = MerkleTree::from_vec(&::ring::digest::SHA256, shards_t); + // If the root hash of the reconstructed tree does not match the one + // received with proofs then abort. + if *mtree.root_hash() != *root_hash { + // NOTE: The paper does not define the meaning of *abort*. But it is + // sensible not to continue trying to reconstruct the tree after this + // point. This instance must have received incorrect shards. + Err(BroadcastError::RootHashMismatch) + } + else { + // Reconstruct the value from the data shards. + Ok(glue_shards(mtree, data_shard_num)) + } +} + + +/// Concatenates the first `n` leaf values of a Merkle tree `m` in one value of +/// type `T`. This is useful for reconstructing the data value held in the tree +/// and forgetting the leaves that contain parity information. +fn glue_shards(m: MerkleTree, n: usize) -> T +where T: From>, Vec: From +{ + let mut t: Vec = Vec::new(); + let mut i = 0; + + for s in m.into_iter() { + i += 1; + if i > n { + break; + } + for b in Vec::from(s).into_iter() { + t.push(b); + } + } + Vec::into(t) } /// An additional path conversion operation on `Lemma` to allow reconstruction diff --git a/src/lib.rs b/src/lib.rs index a23267b..dae8713 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -34,6 +34,7 @@ //! the consensus `result` is not an error then every successfully terminated //! consensus node will be the same `result`. +#![feature(optin_builtin_traits)] #[macro_use] extern crate error_chain; #[macro_use] diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 0a59bf0..49b0bbe 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -1,5 +1,4 @@ //! Construction of messages from protobuf buffers. -#![feature(optin_builtin_traits)] pub mod message; use std::marker::{Send, Sync};