added the proposer ID to common_subset::handle_broadcast and mae it an interface fn

This commit is contained in:
Vladimir Komendantskiy 2018-05-10 11:01:25 +01:00
parent 51ef11b55c
commit cf33bac533
1 changed files with 26 additions and 45 deletions

View File

@ -18,19 +18,6 @@ type ProposedValue = Vec<u8>;
// Type of output from the Common Subset message handler.
type CommonSubsetOutput<NodeUid> = (Option<HashSet<ProposedValue>>, VecDeque<Output<NodeUid>>);
/// Input from a remote node to Common Subset.
pub enum Input<NodeUid> {
/// Message from a remote node `uid` to the broadcast instance `uid`.
Broadcast(NodeUid, BroadcastMessage<ProposedValue>),
/// Message from a remote node `message_sender_id` concerning the common
/// subset element proposed by the node `element_proposer_id`.
Agreement {
message_sender_id: NodeUid,
element_proposer_id: NodeUid,
message: AgreementMessage,
},
}
/// Output from Common Subset to remote nodes.
pub enum Output<NodeUid> {
/// A broadcast message to be sent to the destination set in the
@ -118,34 +105,22 @@ impl<NodeUid: Clone + Debug + Display + Eq + Hash + Ord> CommonSubset<NodeUid> {
}
}
/// Receive input from a remote node. The output contains an optional result
/// of the Common Subset algorithm - a set of proposed values - and a queue
/// of messages to be sent to remote nodes, or an error.
pub fn on_input(
/// Receives a broadcast message from a remote node `sender_id` concerning a
/// value proposed by the node `proposer_id`. The output contains an
/// optional result of the Common Subset algorithm - a set of proposed
/// values - and a queue of messages to be sent to remote nodes, or an
/// error.
pub fn handle_broadcast(
&mut self,
input_message: Input<NodeUid>,
) -> Result<CommonSubsetOutput<NodeUid>, Error> {
match input_message {
Input::Broadcast(uid, bmessage) => self.on_input_broadcast(&uid, bmessage),
Input::Agreement {
message_sender_id,
element_proposer_id,
message,
} => self.on_input_agreement(message_sender_id, element_proposer_id, &message),
}
}
fn on_input_broadcast(
&mut self,
uid: &NodeUid,
sender_id: &NodeUid,
proposer_id: &NodeUid,
bmessage: BroadcastMessage<ProposedValue>,
) -> Result<CommonSubsetOutput<NodeUid>, Error> {
let mut instance_result = None;
let input_result: Result<VecDeque<Output<NodeUid>>, Error> = {
if let Some(broadcast_instance) = self.broadcast_instances.get(&uid) {
if let Some(broadcast_instance) = self.broadcast_instances.get(proposer_id) {
broadcast_instance
.handle_broadcast_message(&uid, bmessage)
.handle_broadcast_message(sender_id, bmessage)
.map(|(opt_value, queue)| {
instance_result = opt_value;
queue.into_iter().map(Output::Broadcast).collect()
@ -157,8 +132,8 @@ impl<NodeUid: Clone + Debug + Display + Eq + Hash + Ord> CommonSubset<NodeUid> {
};
let mut opt_message: Option<AgreementMessage> = None;
if let Some(value) = instance_result {
self.broadcast_results.insert(uid.clone(), value);
opt_message = self.on_broadcast_result(&uid)?;
self.broadcast_results.insert(proposer_id.clone(), value);
opt_message = self.on_broadcast_result(proposer_id)?;
}
input_result.map(|mut queue| {
if let Some(agreement_message) = opt_message {
@ -169,17 +144,22 @@ impl<NodeUid: Clone + Debug + Display + Eq + Hash + Ord> CommonSubset<NodeUid> {
})
}
fn on_input_agreement(
/// Receives an agreement message from a remote node `sender_id` concerning
/// a value proposed by the node `proposer_id`. The output contains an
/// optional result of the Common Subset algorithm - a set of proposed
/// values - and a queue of messages to be sent to remote nodes, or an
/// error.
pub fn handle_agreement(
&mut self,
msg_sender_id: NodeUid,
element_proposer_id: NodeUid,
sender_id: &NodeUid,
proposer_id: &NodeUid,
amessage: &AgreementMessage,
) -> Result<CommonSubsetOutput<NodeUid>, Error> {
// The result defaults to error.
let mut result = Err(Error::NoSuchAgreementInstance);
// Send the message to the local instance of Agreement
if let Some(agreement_instance) = self.agreement_instances.get_mut(&element_proposer_id) {
if let Some(agreement_instance) = self.agreement_instances.get_mut(proposer_id) {
// Optional output of agreement and outgoing agreement
// messages to remote nodes.
result = if agreement_instance.terminated() {
@ -188,7 +168,7 @@ impl<NodeUid: Clone + Debug + Display + Eq + Hash + Ord> CommonSubset<NodeUid> {
} else {
// Send the message to the agreement instance.
agreement_instance
.handle_agreement_message(msg_sender_id, &amessage)
.handle_agreement_message(sender_id.clone(), &amessage)
.map_err(Error::from)
}
}
@ -196,7 +176,7 @@ impl<NodeUid: Clone + Debug + Display + Eq + Hash + Ord> CommonSubset<NodeUid> {
if let Ok((output, mut outgoing)) = result {
// Process Agreement outputs.
if let Some(b) = output {
outgoing.append(&mut self.on_agreement_result(element_proposer_id, b)?);
outgoing.append(&mut self.on_agreement_result(proposer_id, b)?);
}
// Check whether Agreement has completed.
@ -215,14 +195,15 @@ impl<NodeUid: Clone + Debug + Display + Eq + Hash + Ord> CommonSubset<NodeUid> {
/// instance `uid`.
fn on_agreement_result(
&mut self,
element_proposer_id: NodeUid,
element_proposer_id: &NodeUid,
result: bool,
) -> Result<VecDeque<AgreementMessage>, Error> {
let mut outgoing = VecDeque::new();
// Upon delivery of value 1 from at least N f instances of BA, provide
// input 0 to each instance of BA that has not yet been provided input.
if result {
self.agreement_results.insert(element_proposer_id, result);
self.agreement_results
.insert(element_proposer_id.clone(), result);
// The number of instances of BA that output 1.
let results1 = self.agreement_results.values().filter(|v| **v).count();