updated Common Subset with the new NodeUid type parameter

This commit is contained in:
Vladimir Komendantskiy 2018-05-02 08:15:47 +01:00
parent 579abbf415
commit fbb69baa3c
1 changed files with 50 additions and 35 deletions

View File

@ -3,6 +3,8 @@
use crossbeam_channel::{SendError, Sender};
use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::RwLock;
use std::hash::Hash;
use std::fmt::{Debug, Display};
use agreement;
use agreement::Agreement;
@ -11,12 +13,12 @@ use broadcast;
use broadcast::{Broadcast, TargetedBroadcastMessage};
use messaging;
use messaging::{AlgoMessage, Algorithm, Handler, LocalMessage, MessageLoopState, NodeUid,
use messaging::{AlgoMessage, Algorithm, Handler, LocalMessage, MessageLoopState,
ProposedValue, QMessage, RemoteMessage};
use proto::{BroadcastMessage, AgreementMessage};
pub enum Message {
pub enum Input<NodeUid> {
/// Local message to initiate broadcast of a value.
CommonSubset(ProposedValue),
/// Message from a remote node `uid` to the broadcast instance `uid`.
@ -25,22 +27,26 @@ pub enum Message {
Agreement(NodeUid, AgreementMessage),
}
struct CommonSubsetState {
pub enum Output<NodeUid> {
Broadcast(TargetedBroadcastMessage<NodeUid>)
}
struct CommonSubsetState<NodeUid: Eq + Hash> {
agreement_inputs: HashMap<NodeUid, bool>,
agreement_true_outputs: HashSet<NodeUid>,
agreements_without_input: HashSet<NodeUid>,
broadcast_instances: HashMap<NodeUid, Broadcast<NodeUid>>,
agreement_instances: HashMap<NodeUid, Agreement>,
}
pub struct CommonSubset {
pub struct CommonSubset<NodeUid: Eq + Hash> {
uid: NodeUid,
num_nodes: usize,
num_faulty_nodes: usize,
broadcast_instances: HashMap<NodeUid, Broadcast<NodeUid>>,
agreement_instances: HashMap<NodeUid, Agreement>,
state: RwLock<CommonSubsetState>,
state: RwLock<CommonSubsetState<NodeUid>>,
}
impl CommonSubset {
impl<NodeUid: Clone + Debug + Display + Eq + Hash> CommonSubset<NodeUid> {
pub fn new(uid: NodeUid, num_nodes: usize, node_uids: HashSet<NodeUid>) -> Self {
let num_faulty_nodes = (num_nodes - 1) / 3;
@ -48,14 +54,14 @@ impl CommonSubset {
uid,
num_nodes,
num_faulty_nodes,
// FIXME: instantiate broadcast instances
broadcast_instances: HashMap::new(),
// FIXME: instantiate agreement instances
agreement_instances: HashMap::new(),
state: RwLock::new(CommonSubsetState {
agreement_inputs: HashMap::new(),
agreement_true_outputs: HashSet::new(),
agreements_without_input: node_uids,
state: RwLock::new(CommonSubsetState {
agreement_inputs: HashMap::new(),
agreement_true_outputs: HashSet::new(),
agreements_without_input: node_uids,
// FIXME: instantiate broadcast instances
broadcast_instances: HashMap::new(),
// FIXME: instantiate agreement instances
agreement_instances: HashMap::new(),
}),
}
}
@ -63,60 +69,69 @@ impl CommonSubset {
/// Common Subset input message handler. It receives a value for broadcast
/// and redirects it to the corresponding broadcast instance.
pub fn on_proposed_value(&self, value: ProposedValue) ->
Result<VecDeque<RemoteMessage>, Error>
Result<VecDeque<Output<NodeUid>>, Error>
{
// Upon receiving input v_i , input v_i to RBC_i. See Figure 2.
if let Some(instance) = self.broadcast_instances.get(&self.uid) {
let state = self.state.read().unwrap();
if let Some(instance) = state.broadcast_instances.get(&self.uid) {
Ok(instance
.propose_value(value)?
.into_iter()
.map(TargetedBroadcastMessage::into_remote_message)
.map(Output::Broadcast)
.collect())
} else {
Err(Error::NoSuchBroadcastInstance(self.uid))
Err(Error::NoSuchBroadcastInstance)
}
}
/// Upon delivery of v_j from RBC_j, if input has not yet been provided to
/// BA_j, then provide input 1 to BA_j. See Figure 11.
pub fn on_broadcast_output(&mut self, uid: NodeUid) ->
pub fn on_broadcast_output(&self, uid: NodeUid) ->
Result<(), Error>
{
if let Some(agreement_instance) = self.agreement_instances.get_mut(&uid) {
let mut state = self.state.write().unwrap();
if let Some(agreement_instance) = state.agreement_instances.get_mut(&uid) {
if agreement_instance.get_input().is_none() {
agreement_instance.set_input(true);
let mut state = self.state.write().unwrap();
state.agreements_without_input.remove(&uid);
// FIXME: implement this counter indirectly by filtering the
// list of Agreement instances
//
// state.agreements_without_input.remove(&uid);
}
Ok(())
}
else {
Err(Error::NoSuchBroadcastInstance(self.uid))
Err(Error::NoSuchBroadcastInstance)
}
}
pub fn handle_input(&self, message: Message) ->
Result<VecDeque<RemoteMessage>, Error>
pub fn on_input(&self, message: Input<NodeUid>) ->
Result<VecDeque<Output<NodeUid>>, Error>
{
match message {
Message::CommonSubset(value) => self.on_proposed_value(value),
Message::Broadcast(uid, bmessage) => {
if let Some(broadcast_instance) = self.broadcast_instances.get_mut(&uid) {
broadcast_instance.handle_broadcast_message(uid, &bmessage)
Input::CommonSubset(value) => self.on_proposed_value(value),
Input::Broadcast(uid, bmessage) => {
let mut state = self.state.read().unwrap();
if let Some(broadcast_instance) = state.broadcast_instances.get(&uid) {
broadcast_instance.handle_broadcast_message(&uid, &bmessage)
.map(|(value, queue)| {
if let Some(value) = value {
if let Some(_value) = value {
// FIXME: use `value`
self.on_broadcast_output(uid);
}
queue
.into_iter()
.map(Output::Broadcast)
.collect()
})
.map_err(Error::from)
}
else {
Err(Error::NoSuchBroadcastInstance(uid))
Err(Error::NoSuchBroadcastInstance)
}
},
Message::Agreement(_uid, _message) => {
Input::Agreement(_uid, _message) => {
Err(Error::NotImplemented)
}
}
@ -180,7 +195,7 @@ where
pub enum Error {
UnexpectedMessage,
NotImplemented,
NoSuchBroadcastInstance(NodeUid),
NoSuchBroadcastInstance,
Send(SendError<QMessage>),
Broadcast(broadcast::Error),
}