mirror of https://github.com/poanetwork/hbbft.git
transitioning broadcast stage to broadcast instance, i.e. simplifying for a single root hash
This commit is contained in:
parent
eb3bbbdd4f
commit
468cf90076
|
@ -4,6 +4,7 @@ use std::hash::Hash;
|
|||
use std::collections::{HashSet, HashMap};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::sync::mpsc;
|
||||
//use std::rc::Rc;
|
||||
use spmc;
|
||||
use crossbeam;
|
||||
use proto::*;
|
||||
|
@ -37,13 +38,6 @@ pub struct Stage<T: Send + Sync> {
|
|||
pub tx: Arc<Mutex<spmc::Sender<Message<T>>>>,
|
||||
/// The receive side of the multiple producer channel from comms threads.
|
||||
pub rx: Arc<Mutex<mpsc::Receiver<Message<T>>>>,
|
||||
/// Messages of type Value received so far.
|
||||
pub values: HashSet<Proof<T>>,
|
||||
/// Messages of type Echo received so far.
|
||||
pub echos: HashSet<Proof<T>>,
|
||||
/// Messages of type Ready received so far. That is, the root hashes in
|
||||
/// those messages.
|
||||
pub readys: HashMap<Vec<u8>, usize>,
|
||||
/// Value to be broadcast
|
||||
pub broadcast_value: Option<T>
|
||||
}
|
||||
|
@ -60,9 +54,6 @@ where Vec<u8>: From<T>
|
|||
Stage {
|
||||
tx: tx,
|
||||
rx: rx,
|
||||
values: Default::default(),
|
||||
echos: Default::default(),
|
||||
readys: Default::default(),
|
||||
broadcast_value: broadcast_value
|
||||
}
|
||||
}
|
||||
|
@ -79,9 +70,6 @@ where Vec<u8>: From<T>
|
|||
// reason). A `Mutex` is used to grant write access.
|
||||
let rx = self.rx.to_owned();
|
||||
let tx = self.tx.to_owned();
|
||||
let values = Arc::new(Mutex::new(self.values.to_owned()));
|
||||
let echos = Arc::new(Mutex::new(self.echos.to_owned()));
|
||||
let readys = Arc::new(Mutex::new(self.readys.to_owned()));
|
||||
let final_value: Option<T> = None;
|
||||
let final_value_r = Arc::new(Mutex::new(None));
|
||||
let bvalue = self.broadcast_value.to_owned();
|
||||
|
@ -89,7 +77,7 @@ where Vec<u8>: From<T>
|
|||
crossbeam::scope(|scope| {
|
||||
scope.spawn(move || {
|
||||
*final_value_r.lock().unwrap() =
|
||||
inner_run(tx, rx, values, echos, readys, bvalue);
|
||||
inner_run(tx, rx, bvalue);
|
||||
});
|
||||
});
|
||||
|
||||
|
@ -163,18 +151,11 @@ where T: Clone + Debug + Send + Sync + Into<Vec<u8>>
|
|||
/// The main loop of the broadcast task.
|
||||
fn inner_run<T>(tx: Arc<Mutex<spmc::Sender<Message<T>>>>,
|
||||
rx: Arc<Mutex<mpsc::Receiver<Message<T>>>>,
|
||||
values: Arc<Mutex<HashSet<Proof<T>>>>,
|
||||
echos: Arc<Mutex<HashSet<Proof<T>>>>,
|
||||
readys: Arc<Mutex<HashMap<Vec<u8>, usize>>>,
|
||||
broadcast_value: Option<T>) -> Option<T>
|
||||
where T: Clone + Debug + Eq + Hash + Send + Sync + Into<Vec<u8>>
|
||||
+ From<Vec<u8>> + AsRef<[u8]>
|
||||
, Vec<u8>: From<T>
|
||||
{
|
||||
// return value
|
||||
let reconstructed_value: Option<T> = None;
|
||||
// Ready sent flags
|
||||
let mut ready_sent: HashSet<Vec<u8>> = Default::default();
|
||||
// Erasure coding scheme: N - 2f value shards and 2f parity shards
|
||||
let parity_shard_num = 2 * PLACEHOLDER_F;
|
||||
let data_shard_num = PLACEHOLDER_N - parity_shard_num;
|
||||
|
@ -189,6 +170,22 @@ where T: Clone + Debug + Eq + Hash + Send + Sync + Into<Vec<u8>>
|
|||
send_shards(v, tx.clone(), &coding, data_shard_num, parity_shard_num);
|
||||
}
|
||||
|
||||
// currently known leaf values
|
||||
let mut leaf_values: Vec<Option<Box<[u8]>>> =
|
||||
vec![None; PLACEHOLDER_N];
|
||||
// number of non-None leaf values
|
||||
let mut leaf_values_num = 0;
|
||||
// return value
|
||||
let reconstructed_value: Option<T> = None;
|
||||
// Write-once root hash of a tree broadcast from the sender associated with
|
||||
// this instance.
|
||||
let mut root_hash: Option<Vec<u8>> = None;
|
||||
// Number of times Echo was received with the same root hash.
|
||||
let mut echo_num = 0;
|
||||
// Number of times Ready was received with the same root hash.
|
||||
let mut ready_num = 0;
|
||||
let mut ready_sent = false;
|
||||
|
||||
// TODO: handle exit conditions
|
||||
while reconstructed_value == None {
|
||||
// Receive a message from the socket IO task.
|
||||
|
@ -200,7 +197,21 @@ where T: Clone + Debug + Eq + Hash + Send + Sync + Into<Vec<u8>>
|
|||
// TODO: determine if the paper treats multicast as reflexive and
|
||||
// add an echo to this node if it does.
|
||||
BroadcastMessage::Value(p) => {
|
||||
values.lock().unwrap().insert(p.clone());
|
||||
if let None = root_hash {
|
||||
root_hash = Some(p.root_hash.clone());
|
||||
}
|
||||
|
||||
if let &Some(ref h) = &root_hash {
|
||||
if p.validate(h.as_slice()) {
|
||||
// Save the leaf value for reconstructing the tree
|
||||
// later.
|
||||
leaf_values[index_of_proof(&p)] =
|
||||
Some(Vec::from(p.value.clone())
|
||||
.into_boxed_slice());
|
||||
leaf_values_num = leaf_values_num + 1;
|
||||
}
|
||||
}
|
||||
// Broadcast an echo of this proof.
|
||||
tx.lock().unwrap()
|
||||
.send(Message::Broadcast(
|
||||
BroadcastMessage::Echo(p)))
|
||||
|
@ -209,49 +220,56 @@ where T: Clone + Debug + Eq + Hash + Send + Sync + Into<Vec<u8>>
|
|||
|
||||
// An echo received. Verify the proof it contains.
|
||||
BroadcastMessage::Echo(p) => {
|
||||
let root_hash = p.root_hash.clone();
|
||||
//let echos = echos.lock().unwrap();
|
||||
if p.validate(root_hash.as_slice()) {
|
||||
echos.lock().unwrap().insert(p.clone());
|
||||
if let None = root_hash {
|
||||
root_hash = Some(p.root_hash.clone());
|
||||
}
|
||||
|
||||
// Upon receiving valid echos for the same root hash
|
||||
// from N - f distinct parties, try to interpolate the
|
||||
// Merkle tree.
|
||||
//
|
||||
// TODO: eliminate this iteration
|
||||
let mut echo_n = 0;
|
||||
for echo in echos.lock().unwrap().iter() {
|
||||
if echo.root_hash == root_hash {
|
||||
echo_n += 1;
|
||||
}
|
||||
}
|
||||
// call validate with the root hash as argument
|
||||
if let &Some(ref h) = &root_hash {
|
||||
if p.validate(h.as_slice()) {
|
||||
echo_num += 1;
|
||||
// Save the leaf value for reconstructing the tree
|
||||
// later.
|
||||
leaf_values[index_of_proof(&p)] =
|
||||
Some(Vec::from(p.value.clone())
|
||||
.into_boxed_slice());
|
||||
leaf_values_num = leaf_values_num + 1;
|
||||
|
||||
if echo_n >= PLACEHOLDER_N - PLACEHOLDER_F {
|
||||
// Try to interpolate the Merkle tree using the
|
||||
// Reed-Solomon erasure coding scheme.
|
||||
//
|
||||
// FIXME: indicate the missing leaves with None
|
||||
if leaf_values_num >= PLACEHOLDER_N - PLACEHOLDER_F {
|
||||
// Try to interpolate the Merkle tree using the
|
||||
// Reed-Solomon erasure coding scheme.
|
||||
|
||||
let mut leaves: Vec<Option<Box<[u8]>>> = Vec::new();
|
||||
// TODO: optimise this loop out as well
|
||||
for echo in
|
||||
echos.lock().unwrap().iter()
|
||||
{
|
||||
if echo.root_hash == root_hash {
|
||||
leaves.push(Some(
|
||||
Box::from(echo.value.clone().into())));
|
||||
coding.reconstruct_shards(leaf_values
|
||||
.as_mut_slice())
|
||||
.unwrap();
|
||||
|
||||
// Recompute the Merkle tree root.
|
||||
//
|
||||
// Convert shards back to type `T` for tree
|
||||
// construction.
|
||||
let mut shards_t: Vec<T> = Vec::new();
|
||||
for l in leaf_values.iter() {
|
||||
if let Some(ref v) = *l {
|
||||
let s = Vec::into(v.to_vec());
|
||||
shards_t.push(s);
|
||||
}
|
||||
}
|
||||
// Construct the Merkle tree.
|
||||
let mtree = MerkleTree::from_vec(
|
||||
&::ring::digest::SHA256, shards_t);
|
||||
// If the root hash of the reconstructed tree
|
||||
// does not match the one received with proofs
|
||||
// then abort.
|
||||
if *mtree.root_hash() != *h {
|
||||
break;
|
||||
}
|
||||
}
|
||||
coding.reconstruct_shards(leaves.as_mut_slice())
|
||||
.unwrap();
|
||||
|
||||
// FIXME: Recompute Merkle tree root.
|
||||
|
||||
// if Ready has not yet been sent, multicast Ready
|
||||
if let None = ready_sent.get(&root_hash) {
|
||||
ready_sent.insert(root_hash.clone());
|
||||
if !ready_sent {
|
||||
ready_sent = true;
|
||||
tx.lock().unwrap().send(Message::Broadcast(
|
||||
BroadcastMessage::Ready(root_hash)))
|
||||
BroadcastMessage::Ready(h.to_owned())))
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
|
@ -259,35 +277,33 @@ where T: Clone + Debug + Eq + Hash + Send + Sync + Into<Vec<u8>>
|
|||
},
|
||||
|
||||
BroadcastMessage::Ready(ref h) => {
|
||||
// Number of times Ready(h) was received.
|
||||
let ready_n;
|
||||
if let Some(n) = readys.lock().unwrap().get_mut(h) {
|
||||
*n = *n + 1;
|
||||
ready_n = *n;
|
||||
}
|
||||
else {
|
||||
//
|
||||
readys.lock().unwrap().insert(h.clone(), 1);
|
||||
ready_n = 1;
|
||||
// TODO: Prioritise the Value root hash, possibly. Prevent
|
||||
// an incorrect node from blocking progress which it could
|
||||
// achieve by sending an incorrect hash.
|
||||
if let None = root_hash {
|
||||
root_hash = Some(h.clone());
|
||||
}
|
||||
// Check that the root hash matches.
|
||||
if let &Some(ref h) = &root_hash {
|
||||
ready_num += 1;
|
||||
|
||||
// Upon receiving f + 1 matching Ready(h) messages, if Ready
|
||||
// has not yet been sent, multicast Ready(h).
|
||||
if (ready_n == PLACEHOLDER_F + 1) &&
|
||||
(ready_sent.get(h) == None)
|
||||
{
|
||||
tx.lock().unwrap().send(Message::Broadcast(
|
||||
BroadcastMessage::Ready(h.to_vec()))).unwrap();
|
||||
}
|
||||
// Upon receiving f + 1 matching Ready(h) messages, if
|
||||
// Ready has not yet been sent, multicast Ready(h).
|
||||
if (ready_num == PLACEHOLDER_F + 1) &&
|
||||
!ready_sent
|
||||
{
|
||||
tx.lock().unwrap().send(Message::Broadcast(
|
||||
BroadcastMessage::Ready(h.to_vec()))).unwrap();
|
||||
}
|
||||
|
||||
// Upon receiving 2f + 1 matching Ready(h) messages, wait
|
||||
// for N − 2f Echo messages, then decode v.
|
||||
if (ready_n > 2 * PLACEHOLDER_F) &&
|
||||
(reconstructed_value == None) &&
|
||||
(echos.lock().unwrap().len() >=
|
||||
PLACEHOLDER_N - 2 * PLACEHOLDER_F)
|
||||
{
|
||||
// FIXME: decode v
|
||||
// Upon receiving 2f + 1 matching Ready(h) messages,
|
||||
// wait for N − 2f Echo messages, then decode v.
|
||||
if (ready_num > 2 * PLACEHOLDER_F) &&
|
||||
(reconstructed_value == None) &&
|
||||
(echo_num >= PLACEHOLDER_N - 2 * PLACEHOLDER_F)
|
||||
{
|
||||
// FIXME: decode v
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -303,7 +319,7 @@ where T: Clone + Debug + Eq + Hash + Send + Sync + Into<Vec<u8>>
|
|||
/// An additional path conversion operation on `Lemma` to allow reconstruction
|
||||
/// of erasure-coded `Proof` from `Lemma`s. The output path, when read from left
|
||||
/// to right, goes from leaf to root (LSB order).
|
||||
pub fn lemma_to_path(lemma: &Lemma) -> Vec<bool> {
|
||||
fn path_of_lemma(lemma: &Lemma) -> Vec<bool> {
|
||||
match lemma.sub_lemma {
|
||||
None => {
|
||||
match lemma.sibling_hash {
|
||||
|
@ -316,7 +332,7 @@ pub fn lemma_to_path(lemma: &Lemma) -> Vec<bool> {
|
|||
}
|
||||
}
|
||||
Some(ref l) => {
|
||||
let mut p = lemma_to_path(l.as_ref());
|
||||
let mut p = path_of_lemma(l.as_ref());
|
||||
|
||||
match lemma.sibling_hash {
|
||||
// lemma terminates
|
||||
|
@ -332,7 +348,7 @@ pub fn lemma_to_path(lemma: &Lemma) -> Vec<bool> {
|
|||
}
|
||||
|
||||
/// Further conversion of a binary tree path into an array index.
|
||||
pub fn path_to_index(mut path: Vec<bool>) -> usize {
|
||||
fn index_of_path(mut path: Vec<bool>) -> usize {
|
||||
let mut idx = 0;
|
||||
// Convert to the MSB order.
|
||||
path.reverse();
|
||||
|
@ -347,3 +363,8 @@ pub fn path_to_index(mut path: Vec<bool>) -> usize {
|
|||
}
|
||||
idx
|
||||
}
|
||||
|
||||
/// Computes the Merkle tree leaf index of a value in a given proof.
|
||||
fn index_of_proof<T>(p: &Proof<T>) -> usize {
|
||||
index_of_path(path_of_lemma(&p.lemma))
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue