introduced error forwarding in the broadcast module for IO operations

This commit is contained in:
Vladimir Komendantskiy 2018-04-06 17:39:15 +01:00
parent bc071d7a09
commit 99ff3c4863
1 changed files with 43 additions and 23 deletions

View File

@ -7,8 +7,9 @@ use proto::*;
use std::marker::{Send, Sync};
use merkle::MerkleTree;
use merkle::proof::{Proof, Lemma, Positioned};
use reed_solomon_erasure as rse;
use reed_solomon_erasure::ReedSolomon;
use crossbeam_channel::{Sender, Receiver};
use crossbeam_channel::{Sender, Receiver, SendError, RecvError};
use messaging::{Target, TargetedMessage, SourcedMessage};
@ -59,10 +60,10 @@ where Vec<u8>: From<T>
/// Broadcast stage task returning the computed values in case of success,
/// and an error in case of failure.
pub fn run(&mut self) -> Result<T, BroadcastError> {
pub fn run(&mut self) -> Result<T, Error<T>> {
// Broadcast state machine thread.
let bvalue = self.broadcast_value.to_owned();
let result: Result<T, BroadcastError>;
let result: Result<T, Error<T>>;
let result_r = Arc::new(Mutex::new(None));
let result_r_scoped = result_r.clone();
@ -78,18 +79,35 @@ where Vec<u8>: From<T>
result = r.to_owned();
}
else {
result = Err(BroadcastError::Threading);
result = Err(Error::Threading);
}
result
}
}
/// Errors returned by the broadcast instance.
#[derive(Debug, Clone, PartialEq)]
pub enum BroadcastError {
#[derive(Debug, Clone)]
pub enum Error<T: Clone + Debug + Send + Sync> {
RootHashMismatch,
Threading,
ProofConstructionFailed
ProofConstructionFailed,
ReedSolomon(rse::Error),
Send(SendError<TargetedMessage<T>>),
Recv(RecvError)
}
impl<T: Clone + Debug + Send + Sync> From<rse::Error> for Error<T> {
fn from(err: rse::Error) -> Error<T> { Error::ReedSolomon(err) }
}
impl<T: Clone + Debug + Send + Sync> From<SendError<TargetedMessage<T>>>
for Error<T>
{
fn from(err: SendError<TargetedMessage<T>>) -> Error<T> { Error::Send(err) }
}
impl<T: Clone + Debug + Send + Sync> From<RecvError> for Error<T> {
fn from(err: RecvError) -> Error<T> { Error::Recv(err) }
}
/// Breaks the input value into shards of equal length and encodes them -- and
@ -100,7 +118,7 @@ pub enum BroadcastError {
fn send_shards<'a, T>(value: T,
tx: &'a Sender<TargetedMessage<T>>,
coding: &ReedSolomon) ->
Result<Proof<T>, BroadcastError>
Result<Proof<T>, Error<T>>
where T: Clone + Debug + Send + Sync + Into<Vec<u8>>
+ From<Vec<u8>> + AsRef<[u8]>
, Vec<u8>: From<T>
@ -134,7 +152,7 @@ where T: Clone + Debug + Send + Sync + Into<Vec<u8>>
}
// Construct the parity chunks/shards
coding.encode(shards.as_mut_slice()).unwrap();
coding.encode(shards.as_mut_slice())?;
debug!("Shards: {:?}", shards);
@ -150,7 +168,7 @@ where T: Clone + Debug + Send + Sync + Into<Vec<u8>>
let mtree = MerkleTree::from_vec(&::ring::digest::SHA256, shards_t);
// Default result in case of `gen_proof` error.
let mut result = Err(BroadcastError::ProofConstructionFailed);
let mut result = Err(Error::ProofConstructionFailed);
// Send each proof to a node.
for (i, leaf_value) in mtree.iter().cloned().enumerate() {
@ -167,7 +185,7 @@ where T: Clone + Debug + Send + Sync + Into<Vec<u8>>
target: Target::Node(i),
message: Message::Broadcast(
BroadcastMessage::Value(proof))
}).unwrap();
})?;
}
}
}
@ -182,7 +200,7 @@ fn inner_run<'a, T>(tx: &'a Sender<TargetedMessage<T>>,
node_index: usize,
num_nodes: usize,
num_faulty_nodes: usize) ->
Result<T, BroadcastError>
Result<T, Error<T>>
where T: Clone + Debug + Eq + Hash + Send + Sync + Into<Vec<u8>>
+ From<Vec<u8>> + AsRef<[u8]>
, Vec<u8>: From<T>
@ -190,7 +208,7 @@ where T: Clone + Debug + Eq + Hash + Send + Sync + Into<Vec<u8>>
// Erasure coding scheme: N - 2f value shards and 2f parity shards
let parity_shard_num = 2 * num_faulty_nodes;
let data_shard_num = num_nodes - parity_shard_num;
let coding = ReedSolomon::new(data_shard_num, parity_shard_num).unwrap();
let coding = ReedSolomon::new(data_shard_num, parity_shard_num)?;
// currently known leaf values
let mut leaf_values: Vec<Option<Box<[u8]>>> = vec![None; num_nodes];
// Write-once root hash of a tree broadcast from the sender associated with
@ -222,7 +240,7 @@ where T: Clone + Debug + Eq + Hash + Send + Sync + Into<Vec<u8>>
}
// return value
let mut result: Option<Result<T, BroadcastError>> = None;
let mut result: Option<Result<T, Error<T>>> = None;
// Number of times Echo was received with the same root hash.
let mut echo_num = 0;
// Number of times Ready was received with the same root hash.
@ -231,9 +249,9 @@ where T: Clone + Debug + Eq + Hash + Send + Sync + Into<Vec<u8>>
let mut ready_to_decode = false;
// TODO: handle exit conditions
while result == None {
while result.is_none() {
// Receive a message from the socket IO task.
let message = rx.recv().unwrap();
let message = rx.recv()?;
if let SourcedMessage {
source: i,
message: Message::Broadcast(message)
@ -266,7 +284,7 @@ where T: Clone + Debug + Eq + Hash + Send + Sync + Into<Vec<u8>>
tx.send(TargetedMessage {
target: Target::All,
message: Message::Broadcast(BroadcastMessage::Echo(p))
}).unwrap()
})?
},
// An echo received. Verify the proof it contains.
@ -318,7 +336,7 @@ where T: Clone + Debug + Eq + Hash + Send + Sync + Into<Vec<u8>>
message: Message::Broadcast(
BroadcastMessage::Ready(
h.to_owned()))
}).unwrap();
})?;
}
}
}
@ -350,7 +368,7 @@ where T: Clone + Debug + Eq + Hash + Send + Sync + Into<Vec<u8>>
message: Message::Broadcast(
BroadcastMessage::Ready(
h.to_vec()))
}).unwrap();
})?;
}
// Upon receiving 2f + 1 matching Ready(h) messages,
@ -375,6 +393,7 @@ where T: Clone + Debug + Eq + Hash + Send + Sync + Into<Vec<u8>>
error!("Incorrect message from the socket: {:?}", message);
}
}
// result is not a None, safe to extract value
result.unwrap()
}
@ -382,12 +401,13 @@ fn decode_from_shards<T>(leaf_values: &mut Vec<Option<Box<[u8]>>>,
coding: &ReedSolomon,
data_shard_num: usize,
root_hash: &Vec<u8>) ->
Result<T, BroadcastError>
where T: AsRef<[u8]> + From<Vec<u8>>, Vec<u8>: From<T>
Result<T, Error<T>>
where T: Clone + Debug + Send + Sync + AsRef<[u8]> + From<Vec<u8>>,
Vec<u8>: From<T>
{
// Try to interpolate the Merkle tree using the Reed-Solomon erasure coding
// scheme.
coding.reconstruct_shards(leaf_values.as_mut_slice()).unwrap();
coding.reconstruct_shards(leaf_values.as_mut_slice())?;
// Recompute the Merkle tree root.
//
@ -407,7 +427,7 @@ where T: AsRef<[u8]> + From<Vec<u8>>, Vec<u8>: From<T>
// NOTE: The paper does not define the meaning of *abort*. But it is
// sensible not to continue trying to reconstruct the tree after this
// point. This instance must have received incorrect shards.
Err(BroadcastError::RootHashMismatch)
Err(Error::RootHashMismatch)
}
else {
// Reconstruct the value from the data shards.