broadcast ECHO message handler and mutable access solution for writeable state of broadcast

This commit is contained in:
Vladimir Komendantskiy 2018-04-25 20:41:46 +01:00
parent c7020e7b6a
commit 8cd1f4748d
3 changed files with 161 additions and 29 deletions

View File

@ -2,7 +2,7 @@
use std::collections::{HashMap, VecDeque};
use std::fmt::Debug;
use std::hash::Hash;
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, RwLock};
use crossbeam;
use proto::*;
use std::marker::{Send, Sync};
@ -13,31 +13,61 @@ use reed_solomon_erasure::ReedSolomon;
use crossbeam_channel::{Sender, Receiver, SendError, RecvError};
use messaging::{Target, TargetedMessage, SourcedMessage,
NodeUid, ProposedValue, QMessage, MessageLoopState,
NodeUid, Algorithm, ProposedValue, QMessage, MessageLoopState,
Handler, LocalMessage, RemoteMessage, AlgoMessage,
RemoteNode};
use messaging;
pub struct Broadcast {
uid: NodeUid,
num_nodes: usize,
struct BroadcastState {
root_hash: Option<Vec<u8>>,
leaf_values: Vec<Option<Box<[u8]>>>,
leaf_values_num: usize,
echo_num: usize,
readys: HashMap<Vec<u8>, usize>,
ready_sent: bool,
ready_to_decode: bool,
}
pub struct Broadcast {
uid: NodeUid,
num_nodes: usize,
num_faulty_nodes: usize,
parity_shard_num: usize,
data_shard_num: usize,
coding: ReedSolomon,
/// Mutable state
state: RwLock<BroadcastState>
}
impl Broadcast {
pub fn new(uid: NodeUid, num_nodes: usize) -> Self {
Broadcast {
pub fn new(uid: NodeUid, num_nodes: usize) -> Result<Self, Error> {
let num_faulty_nodes = (num_nodes - 1) / 3;
let parity_shard_num = 2 * num_faulty_nodes;
let data_shard_num = num_nodes - parity_shard_num;
let coding = ReedSolomon::new(data_shard_num, parity_shard_num)?;
Ok(Broadcast {
uid,
num_nodes,
root_hash: None,
leaf_values: vec![None; num_nodes],
leaf_values_num: 0
}
num_faulty_nodes: num_faulty_nodes,
parity_shard_num: parity_shard_num,
data_shard_num: data_shard_num,
coding: coding,
state: RwLock::new(BroadcastState {
root_hash: None,
leaf_values: vec![None; num_nodes],
leaf_values_num: 0,
echo_num: 0,
readys: HashMap::new(),
ready_sent: false,
ready_to_decode: false,
})
})
}
pub fn handle<E>(&mut self, m: QMessage, tx: Sender<QMessage>) ->
// The message-driven interface function for calls from the main message
// loop.
pub fn on_message<E>(&self, m: QMessage, tx: Sender<QMessage>) ->
Result<MessageLoopState, E>
where E: From<Error> + From<messaging::Error>
{
@ -59,26 +89,33 @@ impl Broadcast {
node: RemoteNode::Node(uid),
message
}) => {
let mut state = self.state.write().unwrap();
let no_outgoing =
Ok(MessageLoopState::Processing(VecDeque::new()));
// A value received. Record the value and multicast an echo.
if let Message::Broadcast(b) = message { match b {
BroadcastMessage::Value(p) => {
if uid != self.uid {
// Ignore value messages from unrelated remote nodes.
Ok(MessageLoopState::Processing(VecDeque::new()))
no_outgoing
}
else {
if let None = self.root_hash {
self.root_hash = Some(p.root_hash.clone());
// Initialise the root hash if not already
// initialised.
if let None = state.root_hash {
state.root_hash = Some(p.root_hash.clone());
debug!("Node {} Value root hash {:?}",
self.uid, HexBytes(&p.root_hash));
}
if let &Some(ref h) = &self.root_hash {
if let Some(ref h) = state.root_hash.clone() {
if p.validate(h.as_slice()) {
// Save the leaf value for reconstructing
// the tree later.
self.leaf_values[index_of_proof(&p)] =
state.leaf_values[index_of_proof(&p)] =
Some(p.value.clone().into_boxed_slice());
self.leaf_values_num += 1;
state.leaf_values_num += 1;
}
}
@ -93,9 +130,86 @@ impl Broadcast {
}
},
_ => Err(Error::NotImplemented).map_err(E::from)
}
}
// An echo received. Verify the proof it contains.
BroadcastMessage::Echo(p) => {
if let None = state.root_hash {
if uid == self.uid {
state.root_hash = Some(p.root_hash.clone());
debug!("Node {} Echo root hash {:?}",
self.uid, state.root_hash);
}
}
// Call validate with the root hash as argument.
if let Some(ref h) = state.root_hash.clone() {
if p.validate(h.as_slice()) {
state.echo_num += 1;
// Save the leaf value for reconstructing the
// tree later.
state.leaf_values[index_of_proof(&p)] =
Some(p.value.clone().into_boxed_slice());
state.leaf_values_num += 1;
// Upon receiving 2f + 1 matching READY(h)
// messages, wait for N 2 f ECHO messages,
// then decode v. Return the decoded v
if state.ready_to_decode &&
state.leaf_values_num >=
self.num_nodes - 2 * self.num_faulty_nodes
{
let value =
decode_from_shards(&mut state.leaf_values,
&self.coding,
self.data_shard_num,
h)?;
tx.send(QMessage::Local(LocalMessage {
dst: Algorithm::CommonSubset,
message: AlgoMessage::Broadcast(value)
})).map_err(Error::from)?;
no_outgoing
}
else if state.leaf_values_num >=
self.num_nodes - self.num_faulty_nodes
{
let result: Result<ProposedValue, Error> =
decode_from_shards(&mut state.leaf_values,
&self.coding,
self.data_shard_num,
h);
match result { Ok(_) => {
// if Ready has not yet been sent,
// multicast Ready
if !state.ready_sent {
state.ready_sent = true;
Ok(MessageLoopState::Processing(
VecDeque::from(vec![RemoteMessage {
node: RemoteNode::All,
message: Message::Broadcast(
BroadcastMessage::Ready(
h.to_owned()))
}])
))
} else { no_outgoing }
}, Err(e) => Err(E::from(e)) }
} else { no_outgoing }
}
else {
debug!("Broadcast/{} cannot validate Echo {:?}",
self.uid, p);
no_outgoing
}
}
else {
error!("Broadcast/{} root hash not initialised",
self.uid);
no_outgoing
}
},
_ => Err(Error::NotImplemented).map_err(E::from)
}}
else {
Err(Error::UnexpectedMessage).map_err(E::from)
}
@ -106,12 +220,12 @@ impl Broadcast {
}
}
impl<E> Handler<E> for Broadcast
impl<'a, E> Handler<E> for Broadcast
where E: From<Error> + From<messaging::Error> {
fn handle(&self, m: QMessage, tx: Sender<QMessage>) ->
Result<MessageLoopState, E>
{
self.handle(m, tx)
self.on_message(m, tx)
}
}
@ -193,7 +307,8 @@ pub enum Error {
Threading,
ProofConstructionFailed,
ReedSolomon(rse::Error),
Send(SendError<TargetedMessage<ProposedValue>>),
Send(SendError<QMessage>),
SendDeprecated(SendError<TargetedMessage<ProposedValue>>),
Recv(RecvError),
UnexpectedMessage,
NotImplemented
@ -204,9 +319,15 @@ impl From<rse::Error> for Error
fn from(err: rse::Error) -> Error { Error::ReedSolomon(err) }
}
impl From<SendError<QMessage>> for Error
{
fn from(err: SendError<QMessage>) -> Error { Error::Send(err) }
}
impl From<SendError<TargetedMessage<ProposedValue>>> for Error
{
fn from(err: SendError<TargetedMessage<ProposedValue>>) -> Error { Error::Send(err) }
fn from(err: SendError<TargetedMessage<ProposedValue>>) ->
Error { Error::SendDeprecated(err) }
}
impl From<RecvError> for Error

View File

@ -41,6 +41,7 @@ impl Iterator for Algorithm {
pub type ProposedValue = Vec<u8>;
/// Kinds of messages sent between algorithm instances.
#[derive(Clone)]
pub enum AlgoMessage {
/// Asynchronous common subset input.
CommonSubsetInput(ProposedValue),
@ -53,6 +54,7 @@ pub enum AlgoMessage {
}
/// A message sent between algorithm instances.
#[derive(Clone)]
pub struct LocalMessage {
/// Identifier of the message destination algorithm.
pub dst: Algorithm,
@ -81,6 +83,7 @@ pub struct RemoteMessage {
}
/// The union type of local and remote messages.
#[derive(Clone)]
pub enum QMessage {
Local(LocalMessage),
Remote(RemoteMessage)
@ -213,7 +216,9 @@ where HandlerError: 'a + From<Error>
message
}) => {
// FIXME: error handling
if let Some(handler) = self.algos.read().unwrap().get(&dst) {
if let Some(mut handler) =
self.algos.write().unwrap().get_mut(&dst)
{
let mut new_result =
handler.handle(QMessage::Local(
LocalMessage {
@ -245,7 +250,7 @@ where HandlerError: 'a + From<Error>
// to result.
//
// FIXME: error handling
self.algos.read().unwrap().iter()
self.algos.write().unwrap().iter_mut()
.fold(Ok(state),
|result1, (_, handler)| {
if let Ok(mut state1) = result1 {

View File

@ -157,8 +157,14 @@ fn create_test_nodes(num_nodes: usize,
let mut broadcast_instances = HashMap::new();
for k in 0..num_nodes {
let them_uid = node_addr(k);
broadcast_instances.insert(them_uid, Broadcast::new(them_uid,
num_nodes));
match Broadcast::new(them_uid, num_nodes) {
Ok(instance) => {
broadcast_instances.insert(them_uid, instance);
},
Err(e) => {
panic!("{:?}", e);
}
}
}
nodes.insert(uid, (TestNode::new(uid, num_nodes, txs, rxs, Some(value)),