added draft responses to Ready messages, started tree interpolation

This commit is contained in:
Vladimir Komendantskiy 2018-03-23 22:54:40 +00:00
parent 1cdec3c39b
commit cd98cd3bab
1 changed files with 145 additions and 33 deletions

View File

@ -1,7 +1,7 @@
//! Reliable broadcast algorithm.
use std::fmt::Debug;
use std::hash::Hash;
use std::collections::HashSet;
use std::collections::{HashSet, HashMap};
use std::sync::{Arc, Mutex};
use std::sync::mpsc;
use spmc;
@ -9,18 +9,19 @@ use crossbeam;
use proto::*;
use std::marker::{Send, Sync};
use merkle::*;
use merkle::proof::*;
use reed_solomon_erasure::*;
// Temporary placeholders for the number of participants and the maximum
// envisaged number of faulty nodes. Only one is required since N >= 3f +
// 1. There are at least two options for where should N and f come from:
//
// - start-up parameters
//
// - initial socket setup phase in node.rs
//
const PLACEHOLDER_N: usize = 10;
const PLACEHOLDER_F: usize = 3;
/// Temporary placeholders for the number of participants and the maximum
/// envisaged number of faulty nodes. Only one is required since N >= 3f +
/// 1. There are at least two options for where should N and f come from:
///
/// - start-up parameters
///
/// - initial socket setup phase in node.rs
///
const PLACEHOLDER_N: usize = 8;
const PLACEHOLDER_F: usize = 2;
pub struct Stage<T: Send + Sync> {
/// The transmit side of the multiple consumer channel to comms threads.
@ -33,7 +34,7 @@ pub struct Stage<T: Send + Sync> {
pub echos: HashSet<Proof<T>>,
/// Messages of type Ready received so far. That is, the root hashes in
/// those messages.
pub readys: HashSet<Vec<u8>>
pub readys: HashMap<Vec<u8>, usize>
}
impl<T: Clone + Debug + Eq + Hash + Send + Sync + Into<Vec<u8>>> Stage<T> {
@ -48,36 +49,57 @@ impl<T: Clone + Debug + Eq + Hash + Send + Sync + Into<Vec<u8>>> Stage<T> {
}
}
/// Broadcast stage main loop returning the computed values in case of
/// success, and an error in case of failure.
pub fn run(&mut self) -> Result<Vec<T>, ()> {
/// Broadcast stage task returning the computed values in case of success,
/// and an error in case of failure.
///
/// TODO: Detailed error status.
pub fn run(&mut self) -> Result<T, ()> {
// Manager thread.
//
// rx cannot be cloned due to its type constraint but can be used
// inside a thread with the help of an `Arc` (`Rc` wouldn't
// work for the same reason).
let rx = self.rx.clone();
let tx = self.tx.clone();
// rx cannot be cloned due to its type constraint but can be used inside
// a thread with the help of an `Arc` (`Rc` wouldn't work for the same
// reason). A `Mutex` is used to grant write access.
let rx = self.rx.to_owned();
let tx = self.tx.to_owned();
let values = Arc::new(Mutex::new(self.values.to_owned()));
let echos = Arc::new(Mutex::new(self.echos.to_owned()));
let readys = Arc::new(Mutex::new(self.readys.to_owned()));
let tree_value: Option<T> = None;
let tree_value_r = Arc::new(Mutex::new(None));
crossbeam::scope(|scope| {
scope.spawn(move || {
inner_run(tx, rx, values, echos);
*tree_value_r.lock().unwrap() =
inner_run(tx, rx, values, echos, readys);
});
});
// TODO
Err(())
match tree_value {
None => Err(()),
Some(v) => Ok(v)
}
}
}
/// The main loop of the broadcast task.
///
/// TODO: If possible, allow for multiple broadcast senders (not found in the
/// paper): Return decoded values of multiple trees. Don't just settle on the
/// first decoded value.
fn inner_run<T>(tx: Arc<Mutex<spmc::Sender<Message<T>>>>,
rx: Arc<Mutex<mpsc::Receiver<Message<T>>>>,
values: Arc<Mutex<HashSet<Proof<T>>>>,
echos: Arc<Mutex<HashSet<Proof<T>>>>)
echos: Arc<Mutex<HashSet<Proof<T>>>>,
readys: Arc<Mutex<HashMap<Vec<u8>, usize>>>) -> Option<T>
where T: Clone + Debug + Eq + Hash + Send + Sync + Into<Vec<u8>>
{
// return value
let tree_value: Option<T> = None;
// Ready sent flags
let mut ready_sent: HashSet<Vec<u8>> = Default::default();
// TODO: handle exit conditions
loop {
while tree_value == None {
// Receive a message from the socket IO task.
let message = rx.lock().unwrap().recv().unwrap();
if let Message::Broadcast(message) = message {
@ -106,18 +128,18 @@ where T: Clone + Debug + Eq + Hash + Send + Sync + Into<Vec<u8>>
// Merkle tree.
//
// TODO: eliminate this iteration
let mut parties = 0;
let mut echo_n = 0;
for echo in echos.lock().unwrap().iter() {
if echo.root_hash == root_hash {
parties += 1;
echo_n += 1;
}
}
if parties >= PLACEHOLDER_N - PLACEHOLDER_F {
if echo_n >= PLACEHOLDER_N - PLACEHOLDER_F {
// Try to interpolate the Merkle tree using the
// Reed-Solomon erasure coding scheme
// Reed-Solomon erasure coding scheme.
//
// TODO: indicate the missing leaves with None
// FIXME: indicate the missing leaves with None
let mut leaves: Vec<Option<Box<[u8]>>> = Vec::new();
// TODO: optimise this loop out as well
@ -126,7 +148,7 @@ where T: Clone + Debug + Eq + Hash + Send + Sync + Into<Vec<u8>>
{
if echo.root_hash == root_hash {
leaves.push(Some(
(Box::from(echo.value.clone().into()))));
Box::from(echo.value.clone().into())));
}
}
let coding = ReedSolomon::new(
@ -134,11 +156,52 @@ where T: Clone + Debug + Eq + Hash + Send + Sync + Into<Vec<u8>>
2 * PLACEHOLDER_F).unwrap();
coding.reconstruct_shards(leaves.as_mut_slice())
.unwrap();
// FIXME: Recompute Merkle tree root.
// if Ready has not yet been sent, multicast Ready
if let None = ready_sent.get(&root_hash) {
ready_sent.insert(root_hash.clone());
tx.lock().unwrap().send(Message::Broadcast(
BroadcastMessage::Ready(root_hash)))
.unwrap();
}
}
// TODO
}
},
_ => unimplemented!()
BroadcastMessage::Ready(ref h) => {
// Number of times Ready(h) was received.
let ready_n;
if let Some(n) = readys.lock().unwrap().get_mut(h) {
*n = *n + 1;
ready_n = *n;
}
else {
//
readys.lock().unwrap().insert(h.clone(), 1);
ready_n = 1;
}
// Upon receiving f + 1 matching Ready(h) messages, if Ready
// has not yet been sent, multicast Ready(h).
if (ready_n == PLACEHOLDER_F + 1) &&
(ready_sent.get(h) == None)
{
tx.lock().unwrap().send(Message::Broadcast(
BroadcastMessage::Ready(h.to_vec()))).unwrap();
}
// Upon receiving 2f + 1 matching Ready(h) messages, wait
// for N 2f Echo messages, then decode v.
if (ready_n > 2 * PLACEHOLDER_F) &&
(tree_value == None) &&
(echos.lock().unwrap().len() >=
PLACEHOLDER_N - 2 * PLACEHOLDER_F)
{
// FIXME: decode v
}
}
}
}
else {
@ -146,4 +209,53 @@ where T: Clone + Debug + Eq + Hash + Send + Sync + Into<Vec<u8>>
message);
}
}
return tree_value;
}
/// An additional path conversion operation on `Lemma` to allow reconstruction
/// of erasure-coded `Proof` from `Lemma`s. The output path, when read from left
/// to right, goes from leaf to root (LSB order).
pub fn lemma_to_path(lemma: &Lemma) -> Vec<bool> {
match lemma.sub_lemma {
None => {
match lemma.sibling_hash {
// lemma terminates with no leaf
None => vec![],
// the leaf is on the right
Some(Positioned::Left(_)) => vec![true],
// the leaf is on the left
Some(Positioned::Right(_)) => vec![false],
}
}
Some(ref l) => {
let mut p = lemma_to_path(l.as_ref());
match lemma.sibling_hash {
// lemma terminates
None => (),
// lemma branches out to the right
Some(Positioned::Left(_)) => p.push(true),
// lemma branches out to the left
Some(Positioned::Right(_)) => p.push(false),
}
p
}
}
}
/// Further conversion of a binary tree path into an array index.
pub fn path_to_index(mut path: Vec<bool>) -> usize {
let mut idx = 0;
// Convert to the MSB order.
path.reverse();
for &dir in path.iter() {
if dir == false {
idx = idx << 1;
}
else {
idx = (idx << 1) | 1;
}
}
idx
}