Remove output and message queue from Broadcast.

This commit is contained in:
Andreas Fackler 2018-07-24 12:18:09 +02:00
parent 30c5805446
commit 990899327e
1 changed files with 52 additions and 68 deletions

View File

@ -127,7 +127,7 @@
//!# }
//! ```
use std::collections::{BTreeMap, VecDeque};
use std::collections::BTreeMap;
use std::fmt::{self, Debug};
use std::iter::once;
use std::sync::Arc;
@ -139,9 +139,9 @@ use reed_solomon_erasure as rse;
use reed_solomon_erasure::ReedSolomon;
use ring::digest;
use fault_log::{FaultKind, FaultLog};
use fault_log::{Fault, FaultKind};
use fmt::{HexBytes, HexList, HexProof};
use messaging::{self, DistAlgorithm, NetworkInfo, Target, TargetedMessage};
use messaging::{self, DistAlgorithm, NetworkInfo, Target};
error_chain!{
foreign_links {
@ -218,10 +218,6 @@ pub struct Broadcast<NodeUid> {
echos: BTreeMap<NodeUid, Proof<Vec<u8>>>,
/// The root hashes we received via `Ready` messages, by sender ID.
readys: BTreeMap<NodeUid, Vec<u8>>,
/// The outgoing message queue.
messages: VecDeque<TargetedMessage<BroadcastMessage, NodeUid>>,
/// The output, if any.
output: Option<Vec<u8>>,
}
pub type Step<NodeUid> = messaging::Step<Broadcast<NodeUid>>;
@ -242,10 +238,10 @@ impl<NodeUid: Debug + Clone + Ord> DistAlgorithm for Broadcast<NodeUid> {
// Split the value into chunks/shards, encode them with erasure codes.
// Assemble a Merkle tree from data and parity shards. Take all proofs
// from this tree and send them, each to its own node.
let proof = self.send_shards(input)?;
let (proof, mut step) = self.send_shards(input)?;
let our_uid = &self.netinfo.our_uid().clone();
let fault_log = self.handle_value(our_uid, proof)?;
self.step(fault_log)
step.extend(self.handle_value(our_uid, proof)?);
Ok(step)
}
fn handle_message(
@ -256,14 +252,11 @@ impl<NodeUid: Debug + Clone + Ord> DistAlgorithm for Broadcast<NodeUid> {
if !self.netinfo.is_node_validator(sender_id) {
return Err(ErrorKind::UnknownSender.into());
}
let fault_log = match message {
BroadcastMessage::Value(p) => self.handle_value(sender_id, p)?,
BroadcastMessage::Echo(p) => self.handle_echo(sender_id, p)?,
BroadcastMessage::Ready(ref hash) => self
.handle_ready(sender_id, hash)
.map(|()| FaultLog::new())?,
};
self.step(fault_log)
match message {
BroadcastMessage::Value(p) => self.handle_value(sender_id, p),
BroadcastMessage::Echo(p) => self.handle_echo(sender_id, p),
BroadcastMessage::Ready(ref hash) => self.handle_ready(sender_id, hash),
}
}
fn terminated(&self) -> bool {
@ -293,25 +286,15 @@ impl<NodeUid: Debug + Clone + Ord> Broadcast<NodeUid> {
decided: false,
echos: BTreeMap::new(),
readys: BTreeMap::new(),
messages: VecDeque::new(),
output: None,
})
}
fn step(&mut self, fault_log: FaultLog<NodeUid>) -> Result<Step<NodeUid>> {
Ok(Step::new(
self.output.take().into_iter().collect(),
fault_log,
self.messages.drain(..).collect(),
))
}
/// Breaks the input value into shards of equal length and encodes them --
/// and some extra parity shards -- with a Reed-Solomon erasure coding
/// scheme. The returned value contains the shard assigned to this
/// node. That shard doesn't need to be sent anywhere. It gets recorded in
/// the broadcast instance.
fn send_shards(&mut self, mut value: Vec<u8>) -> Result<Proof<Vec<u8>>> {
fn send_shards(&mut self, mut value: Vec<u8>) -> Result<(Proof<Vec<u8>>, Step<NodeUid>)> {
let data_shard_num = self.coding.data_shard_count();
let parity_shard_num = self.coding.parity_shard_count();
@ -366,6 +349,7 @@ impl<NodeUid: Debug + Clone + Ord> Broadcast<NodeUid> {
let mut result = Err(ErrorKind::ProofConstructionFailed.into());
assert_eq!(self.netinfo.num_nodes(), mtree.iter().count());
let mut step = Step::default();
// Send each proof to a node.
for (leaf_value, uid) in mtree.iter().zip(self.netinfo.all_uids()) {
let proof = mtree
@ -377,19 +361,15 @@ impl<NodeUid: Debug + Clone + Ord> Broadcast<NodeUid> {
} else {
// Rest of the proofs are sent to remote nodes.
let msg = Target::Node(uid.clone()).message(BroadcastMessage::Value(proof));
self.messages.push_back(msg);
step.messages.push_back(msg);
}
}
result
result.map(|proof| (proof, step))
}
/// Handles a received echo and verifies the proof it contains.
fn handle_value(
&mut self,
sender_id: &NodeUid,
p: Proof<Vec<u8>>,
) -> Result<FaultLog<NodeUid>> {
fn handle_value(&mut self, sender_id: &NodeUid, p: Proof<Vec<u8>>) -> Result<Step<NodeUid>> {
// If the sender is not the proposer or if this is not the first `Value`, ignore.
if *sender_id != self.proposer_id {
info!(
@ -399,7 +379,7 @@ impl<NodeUid: Debug + Clone + Ord> Broadcast<NodeUid> {
self.proposer_id
);
let fault_kind = FaultKind::ReceivedValueFromNonProposer;
return Ok(FaultLog::init(sender_id.clone(), fault_kind));
return Ok(Fault::new(sender_id.clone(), fault_kind).into());
}
if self.echo_sent {
info!(
@ -408,12 +388,12 @@ impl<NodeUid: Debug + Clone + Ord> Broadcast<NodeUid> {
);
// TODO: should receiving two Values from a node be considered
// a fault? If so, return a `Fault` here. For now, ignore.
return Ok(FaultLog::new());
return Ok(Step::default());
}
// If the proof is invalid, log the faulty node behavior and ignore.
if !self.validate_proof(&p, &self.netinfo.our_uid()) {
return Ok(FaultLog::init(sender_id.clone(), FaultKind::InvalidProof));
return Ok(Fault::new(sender_id.clone(), FaultKind::InvalidProof).into());
}
// Otherwise multicast the proof in an `Echo` message, and handle it ourselves.
@ -421,8 +401,7 @@ impl<NodeUid: Debug + Clone + Ord> Broadcast<NodeUid> {
}
/// Handles a received `Echo` message.
fn handle_echo(&mut self, sender_id: &NodeUid, p: Proof<Vec<u8>>) -> Result<FaultLog<NodeUid>> {
let mut fault_log = FaultLog::new();
fn handle_echo(&mut self, sender_id: &NodeUid, p: Proof<Vec<u8>>) -> Result<Step<NodeUid>> {
// If the sender has already sent `Echo`, ignore.
if self.echos.contains_key(sender_id) {
info!(
@ -430,13 +409,12 @@ impl<NodeUid: Debug + Clone + Ord> Broadcast<NodeUid> {
self.netinfo.our_uid(),
sender_id,
);
return Ok(fault_log);
return Ok(Step::default());
}
// If the proof is invalid, log the faulty-node behavior, and ignore.
if !self.validate_proof(&p, sender_id) {
fault_log.append(sender_id.clone(), FaultKind::InvalidProof);
return Ok(fault_log);
return Ok(Fault::new(sender_id.clone(), FaultKind::InvalidProof).into());
}
let hash = p.root_hash.clone();
@ -447,17 +425,15 @@ impl<NodeUid: Debug + Clone + Ord> Broadcast<NodeUid> {
if self.ready_sent
|| self.count_echos(&hash) < self.netinfo.num_nodes() - self.netinfo.num_faulty()
{
self.compute_output(&hash)?;
return Ok(fault_log);
return self.compute_output(&hash);
}
// Upon receiving `N - f` `Echo`s with this root hash, multicast `Ready`.
self.send_ready(&hash)?;
Ok(fault_log)
self.send_ready(&hash)
}
/// Handles a received `Ready` message.
fn handle_ready(&mut self, sender_id: &NodeUid, hash: &[u8]) -> Result<()> {
fn handle_ready(&mut self, sender_id: &NodeUid, hash: &[u8]) -> Result<Step<NodeUid>> {
// If the sender has already sent a `Ready` before, ignore.
if self.readys.contains_key(sender_id) {
info!(
@ -465,52 +441,56 @@ impl<NodeUid: Debug + Clone + Ord> Broadcast<NodeUid> {
self.netinfo.our_uid(),
sender_id
);
return Ok(());
return Ok(Step::default());
}
self.readys.insert(sender_id.clone(), hash.to_vec());
let mut step = Step::default();
// Upon receiving f + 1 matching Ready(h) messages, if Ready
// has not yet been sent, multicast Ready(h).
if self.count_readys(hash) == self.netinfo.num_faulty() + 1 && !self.ready_sent {
// Enqueue a broadcast of a Ready message.
self.send_ready(hash)?;
step.extend(self.send_ready(hash)?);
}
self.compute_output(hash)
step.extend(self.compute_output(hash)?);
Ok(step)
}
/// Sends an `Echo` message and handles it. Does nothing if we are only an observer.
fn send_echo(&mut self, p: Proof<Vec<u8>>) -> Result<FaultLog<NodeUid>> {
fn send_echo(&mut self, p: Proof<Vec<u8>>) -> Result<Step<NodeUid>> {
self.echo_sent = true;
if !self.netinfo.is_validator() {
return Ok(FaultLog::new());
return Ok(Step::default());
}
let echo_msg = Target::All.message(BroadcastMessage::Echo(p.clone()));
self.messages.push_back(echo_msg);
let echo_msg = BroadcastMessage::Echo(p.clone());
let mut step: Step<_> = Target::All.message(echo_msg).into();
let our_uid = &self.netinfo.our_uid().clone();
self.handle_echo(our_uid, p)
step.extend(self.handle_echo(our_uid, p)?);
Ok(step)
}
/// Sends a `Ready` message and handles it. Does nothing if we are only an observer.
fn send_ready(&mut self, hash: &[u8]) -> Result<()> {
fn send_ready(&mut self, hash: &[u8]) -> Result<Step<NodeUid>> {
self.ready_sent = true;
if !self.netinfo.is_validator() {
return Ok(());
return Ok(Step::default());
}
let ready_msg = Target::All.message(BroadcastMessage::Ready(hash.to_vec()));
self.messages.push_back(ready_msg);
let ready_msg = BroadcastMessage::Ready(hash.to_vec());
let mut step: Step<_> = Target::All.message(ready_msg).into();
let our_uid = &self.netinfo.our_uid().clone();
self.handle_ready(our_uid, hash)
step.extend(self.handle_ready(our_uid, hash)?);
Ok(step)
}
/// Checks whether the condition for output are met for this hash, and if so, sets the output
/// value.
fn compute_output(&mut self, hash: &[u8]) -> Result<()> {
fn compute_output(&mut self, hash: &[u8]) -> Result<Step<NodeUid>> {
if self.decided
|| self.count_readys(hash) <= 2 * self.netinfo.num_faulty()
|| self.count_echos(hash) <= self.netinfo.num_faulty()
{
return Ok(());
return Ok(Step::default());
}
// Upon receiving 2f + 1 matching Ready(h) messages, wait for N 2f Echo messages.
@ -527,10 +507,14 @@ impl<NodeUid: Debug + Clone + Ord> Broadcast<NodeUid> {
})
})
.collect();
let value = decode_from_shards(&mut leaf_values, &self.coding, self.data_shard_num, hash);
self.decided = value.is_some();
self.output = value;
Ok(())
if let Some(value) =
decode_from_shards(&mut leaf_values, &self.coding, self.data_shard_num, hash)
{
self.decided = true;
Ok(Step::default().with_output(value))
} else {
Ok(Step::default())
}
}
/// Returns `true` if the proof is valid and has the same index as the node ID. Otherwise