Remove interior mutability.

The `RwLock` is not needed anymore, since the broadcast implementation
doesn't handle any threading internally.
This commit is contained in:
Andreas Fackler 2018-05-14 09:35:34 +02:00
parent 0d005ebdc9
commit 71fa32c18f
4 changed files with 72 additions and 88 deletions

View File

@ -123,7 +123,7 @@ impl<T: Clone + Debug + AsRef<[u8]> + PartialEq + Send + Sync + From<Vec<u8>> +
// corresponding to this instance, and no dedicated comms task. The
// node index is 0.
let broadcast_handle = scope.spawn(move || {
let broadcast = Broadcast::new(our_id, proposer_id, (0..num_nodes).collect())
let mut broadcast = Broadcast::new(our_id, proposer_id, (0..num_nodes).collect())
.expect("failed to instantiate broadcast");
if let Some(v) = value {

View File

@ -7,7 +7,6 @@ use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::fmt::{self, Debug};
use std::hash::Hash;
use std::iter;
use std::sync::{RwLock, RwLockWriteGuard};
use messaging::{Target, TargetedMessage};
@ -32,37 +31,6 @@ impl fmt::Debug for BroadcastMessage {
}
}
struct BroadcastState<NodeUid> {
/// Whether we have already multicas `Echo`.
echo_sent: bool,
/// Whether we have already multicast `Ready`.
ready_sent: bool,
/// Whether we have already output a value.
has_output: bool,
/// The proofs we have received via `Echo` messages, by sender ID.
echos: BTreeMap<NodeUid, Proof<Vec<u8>>>,
/// The root hashes we received via `Ready` messages, by sender ID.
readys: BTreeMap<NodeUid, Vec<u8>>,
}
impl<NodeUid: Eq + Hash + Ord> BroadcastState<NodeUid> {
/// Returns the number of nodes that have sent us an `Echo` message with this hash.
fn count_echos(&self, hash: &[u8]) -> usize {
self.echos
.values()
.filter(|p| p.root_hash.as_slice() == hash)
.count()
}
/// Returns the number of nodes that have sent us a `Ready` message with this hash.
fn count_readys(&self, hash: &[u8]) -> usize {
self.readys
.values()
.filter(|h| h.as_slice() == hash)
.count()
}
}
/// Reliable Broadcast algorithm instance.
///
/// The Reliable Broadcast Protocol assumes a network of `N` nodes that send signed messages to
@ -114,10 +82,16 @@ pub struct Broadcast<NodeUid> {
num_faulty_nodes: usize,
data_shard_num: usize,
coding: ReedSolomon,
/// All the mutable state is confined to the `state` field. This allows to
/// mutate state even when the broadcast instance is referred to by an
/// immutable reference.
state: RwLock<BroadcastState<NodeUid>>,
/// Whether we have already multicast `Echo`.
echo_sent: bool,
/// Whether we have already multicast `Ready`.
ready_sent: bool,
/// Whether we have already output a value.
has_output: bool,
/// The proofs we have received via `Echo` messages, by sender ID.
echos: BTreeMap<NodeUid, Proof<Vec<u8>>>,
/// The root hashes we received via `Ready` messages, by sender ID.
readys: BTreeMap<NodeUid, Vec<u8>>,
}
impl<NodeUid: Eq + Hash + Debug + Clone + Ord> Broadcast<NodeUid> {
@ -142,18 +116,16 @@ impl<NodeUid: Eq + Hash + Debug + Clone + Ord> Broadcast<NodeUid> {
num_faulty_nodes,
data_shard_num,
coding,
state: RwLock::new(BroadcastState {
echo_sent: false,
ready_sent: false,
has_output: false,
echos: BTreeMap::new(),
readys: BTreeMap::new(),
}),
echo_sent: false,
ready_sent: false,
has_output: false,
echos: BTreeMap::new(),
readys: BTreeMap::new(),
})
}
/// Processes the proposed value input by broadcasting it.
pub fn propose_value(&self, value: Vec<u8>) -> Result<MessageQueue<NodeUid>, Error> {
pub fn propose_value(&mut self, value: Vec<u8>) -> Result<MessageQueue<NodeUid>, Error> {
if self.our_id != self.proposer_id {
return Err(Error::UnexpectedMessage);
}
@ -163,8 +135,8 @@ impl<NodeUid: Eq + Hash + Debug + Clone + Ord> Broadcast<NodeUid> {
let (proof, value_msgs) = self.send_shards(value)?;
// TODO: We'd actually need to return the output here, if it was only one node. Should that
// use-case be supported?
let state = self.state.write().unwrap();
let (_, echo_msgs) = self.handle_value(&self.our_id, proof, state)?;
let our_id = self.our_id.clone();
let (_, echo_msgs) = self.handle_value(&our_id, proof)?;
Ok(value_msgs.into_iter().chain(echo_msgs).collect())
}
@ -254,27 +226,25 @@ impl<NodeUid: Eq + Hash + Debug + Clone + Ord> Broadcast<NodeUid> {
/// Handler of messages received from remote nodes.
pub fn handle_broadcast_message(
&self,
&mut self,
sender_id: &NodeUid,
message: BroadcastMessage,
) -> Result<(Option<Vec<u8>>, MessageQueue<NodeUid>), Error> {
if !self.all_uids.contains(sender_id) {
return Err(Error::UnknownSender);
}
let state = self.state.write().unwrap();
match message {
BroadcastMessage::Value(p) => self.handle_value(sender_id, p, state),
BroadcastMessage::Echo(p) => self.handle_echo(sender_id, p, state),
BroadcastMessage::Ready(ref hash) => self.handle_ready(sender_id, hash, state),
BroadcastMessage::Value(p) => self.handle_value(sender_id, p),
BroadcastMessage::Echo(p) => self.handle_echo(sender_id, p),
BroadcastMessage::Ready(ref hash) => self.handle_ready(sender_id, hash),
}
}
/// Handles a received echo and verifies the proof it contains.
fn handle_value(
&self,
&mut self,
sender_id: &NodeUid,
p: Proof<Vec<u8>>,
mut state: RwLockWriteGuard<BroadcastState<NodeUid>>,
) -> Result<(Option<Vec<u8>>, MessageQueue<NodeUid>), Error> {
// If the sender is not the proposer, this is not the first `Value` or the proof is invalid,
// ignore.
@ -285,7 +255,7 @@ impl<NodeUid: Eq + Hash + Debug + Clone + Ord> Broadcast<NodeUid> {
);
return Ok((None, VecDeque::new()));
}
if state.echo_sent {
if self.echo_sent {
info!("Node {:?} received multiple Values.", self.our_id);
return Ok((None, VecDeque::new()));
}
@ -294,8 +264,9 @@ impl<NodeUid: Eq + Hash + Debug + Clone + Ord> Broadcast<NodeUid> {
}
// Otherwise multicast the proof in an `Echo` message, and handle it ourselves.
state.echo_sent = true;
let (output, echo_msgs) = self.handle_echo(&self.our_id, p.clone(), state)?;
self.echo_sent = true;
let our_id = self.our_id.clone();
let (output, echo_msgs) = self.handle_echo(&our_id, p.clone())?;
let msgs = iter::once(Target::All.message(BroadcastMessage::Echo(p)))
.chain(echo_msgs)
.collect();
@ -305,13 +276,12 @@ impl<NodeUid: Eq + Hash + Debug + Clone + Ord> Broadcast<NodeUid> {
/// Handles a received `Echo` message.
fn handle_echo(
&self,
&mut self,
sender_id: &NodeUid,
p: Proof<Vec<u8>>,
mut state: RwLockWriteGuard<BroadcastState<NodeUid>>,
) -> Result<(Option<Vec<u8>>, MessageQueue<NodeUid>), Error> {
// If the proof is invalid or the sender has already sent `Echo`, ignore.
if state.echos.contains_key(sender_id) {
if self.echos.contains_key(sender_id) {
info!(
"Node {:?} received multiple Echos from {:?}.",
self.our_id, sender_id,
@ -325,28 +295,28 @@ impl<NodeUid: Eq + Hash + Debug + Clone + Ord> Broadcast<NodeUid> {
let hash = p.root_hash.clone();
// Save the proof for reconstructing the tree later.
state.echos.insert(sender_id.clone(), p);
self.echos.insert(sender_id.clone(), p);
if state.ready_sent || state.count_echos(&hash) < self.num_nodes - self.num_faulty_nodes {
return Ok((self.get_output(state, &hash)?, VecDeque::new()));
if self.ready_sent || self.count_echos(&hash) < self.num_nodes - self.num_faulty_nodes {
return Ok((self.get_output(&hash)?, VecDeque::new()));
}
// Upon receiving `N - f` `Echo`s with this root hash, multicast `Ready`.
state.ready_sent = true;
self.ready_sent = true;
let msg = Target::All.message(BroadcastMessage::Ready(hash.clone()));
let (output, ready_msgs) = self.handle_ready(&self.our_id, &hash, state)?;
let our_id = self.our_id.clone();
let (output, ready_msgs) = self.handle_ready(&our_id, &hash)?;
Ok((output, iter::once(msg).chain(ready_msgs).collect()))
}
/// Handles a received `Ready` message.
fn handle_ready(
&self,
&mut self,
sender_id: &NodeUid,
hash: &[u8],
mut state: RwLockWriteGuard<BroadcastState<NodeUid>>,
) -> Result<(Option<Vec<u8>>, MessageQueue<NodeUid>), Error> {
// If the sender has already sent a `Ready` before, ignore.
if state.readys.contains_key(sender_id) {
if self.readys.contains_key(sender_id) {
info!(
"Node {:?} received multiple Readys from {:?}.",
self.our_id, sender_id
@ -354,41 +324,36 @@ impl<NodeUid: Eq + Hash + Debug + Clone + Ord> Broadcast<NodeUid> {
return Ok((None, VecDeque::new()));
}
state.readys.insert(sender_id.clone(), hash.to_vec());
self.readys.insert(sender_id.clone(), hash.to_vec());
// Upon receiving f + 1 matching Ready(h) messages, if Ready
// has not yet been sent, multicast Ready(h).
let outgoing = if state.count_readys(hash) == self.num_faulty_nodes + 1 && !state.ready_sent
{
let outgoing = if self.count_readys(hash) == self.num_faulty_nodes + 1 && !self.ready_sent {
// Enqueue a broadcast of a Ready message.
state.ready_sent = true;
self.ready_sent = true;
iter::once(Target::All.message(BroadcastMessage::Ready(hash.to_vec()))).collect()
} else {
VecDeque::new()
};
Ok((self.get_output(state, hash)?, outgoing))
Ok((self.get_output(hash)?, outgoing))
}
/// Checks whether the condition for output are met for this hash, and if so, returns the output
/// value.
fn get_output(
&self,
mut state: RwLockWriteGuard<BroadcastState<NodeUid>>,
hash: &[u8],
) -> Result<Option<Vec<u8>>, Error> {
if state.has_output || state.count_readys(hash) <= 2 * self.num_faulty_nodes
|| state.count_echos(hash) <= self.num_faulty_nodes
fn get_output(&mut self, hash: &[u8]) -> Result<Option<Vec<u8>>, Error> {
if self.has_output || self.count_readys(hash) <= 2 * self.num_faulty_nodes
|| self.count_echos(hash) <= self.num_faulty_nodes
{
return Ok(None);
}
// Upon receiving 2f + 1 matching Ready(h) messages, wait for N 2f Echo messages.
state.has_output = true;
self.has_output = true;
let mut leaf_values: Vec<Option<Box<[u8]>>> = self.all_uids
.iter()
.map(|id| {
state.echos.get(id).and_then(|p| {
self.echos.get(id).and_then(|p| {
if p.root_hash.as_slice() == hash {
Some(p.value.clone().into_boxed_slice())
} else {
@ -434,6 +399,22 @@ impl<NodeUid: Eq + Hash + Debug + Clone + Ord> Broadcast<NodeUid> {
true
}
}
/// Returns the number of nodes that have sent us an `Echo` message with this hash.
fn count_echos(&self, hash: &[u8]) -> usize {
self.echos
.values()
.filter(|p| p.root_hash.as_slice() == hash)
.count()
}
/// Returns the number of nodes that have sent us a `Ready` message with this hash.
fn count_readys(&self, hash: &[u8]) -> usize {
self.readys
.values()
.filter(|h| h.as_slice() == hash)
.count()
}
}
/// Errors returned by the broadcast instance.

View File

@ -103,15 +103,16 @@ impl<NodeUid: Clone + Debug + Display + Eq + Hash + Ord> CommonSubset<NodeUid> {
/// Common Subset input message handler. It receives a value for broadcast
/// and redirects it to the corresponding broadcast instance.
pub fn send_proposed_value(
&self,
&mut self,
value: ProposedValue,
) -> Result<VecDeque<TargetedMessage<Message<NodeUid>, NodeUid>>, Error> {
// Upon receiving input v_i , input v_i to RBC_i. See Figure 2.
if let Some(instance) = self.broadcast_instances.get(&self.uid) {
if let Some(instance) = self.broadcast_instances.get_mut(&self.uid) {
let uid = self.uid.clone();
Ok(instance
.propose_value(value)?
.into_iter()
.map(|msg| msg.map(|b_msg| Message::Broadcast(self.uid.clone(), b_msg)))
.map(|msg| msg.map(|b_msg| Message::Broadcast(uid.clone(), b_msg)))
.collect())
} else {
Err(Error::NoSuchBroadcastInstance)
@ -162,7 +163,7 @@ impl<NodeUid: Clone + Debug + Display + Eq + Hash + Ord> CommonSubset<NodeUid> {
VecDeque<TargetedMessage<Message<NodeUid>, NodeUid>>,
Error,
> = {
if let Some(broadcast_instance) = self.broadcast_instances.get(proposer_id) {
if let Some(broadcast_instance) = self.broadcast_instances.get_mut(proposer_id) {
broadcast_instance
.handle_broadcast_message(sender_id, bmessage)
.map(|(opt_value, queue)| {

View File

@ -174,7 +174,7 @@ impl Adversary for ProposeAdversary {
.chain(self.good_nodes.iter().cloned())
.collect();
let id = *self.adv_nodes.iter().next().unwrap();
let bc = Broadcast::new(id, id, node_ids).expect("broadcast instance");
let mut bc = Broadcast::new(id, id, node_ids).expect("broadcast instance");
let msgs = bc.propose_value(value.to_vec()).expect("propose");
msgs.into_iter().map(|msg| (id, msg)).collect()
}
@ -261,7 +261,9 @@ impl<A: Adversary> TestNetwork<A> {
/// Makes the node `proposer_id` propose a value.
fn propose_value(&mut self, proposer_id: NodeId, value: ProposedValue) {
let msgs = self.nodes[&proposer_id]
let msgs = self.nodes
.get_mut(&proposer_id)
.expect("proposer instance")
.broadcast
.propose_value(value)
.expect("propose");