mirror of https://github.com/poanetwork/hbbft.git
defined the output from the Common Subset algorithm
This commit is contained in:
parent
d3b974f888
commit
5215156ec5
|
@ -74,6 +74,11 @@ impl<NodeUid: Clone + Eq + Hash> Agreement<NodeUid> {
|
|||
let mut outgoing = VecDeque::new();
|
||||
|
||||
match *message {
|
||||
_ if self.terminated => {
|
||||
// The algorithm instance has already terminated.
|
||||
Err(Error::Terminated)
|
||||
}
|
||||
|
||||
AgreementMessage::BVal((epoch, b)) if epoch == self.epoch => {
|
||||
update_map_of_sets(&mut self.received_bval, uid, b);
|
||||
let count_bval = self.received_bval.iter().fold(0, |count, (_, values)| {
|
||||
|
@ -113,7 +118,7 @@ impl<NodeUid: Clone + Eq + Hash> Agreement<NodeUid> {
|
|||
}
|
||||
}
|
||||
|
||||
AgreementMessage::Aux((_epoch, b)) => {
|
||||
AgreementMessage::Aux((epoch, b)) if epoch == self.epoch => {
|
||||
update_map_of_sets(&mut self.received_aux, uid, b);
|
||||
if !self.bin_values.is_empty() {
|
||||
let coin_result = self.try_coin();
|
||||
|
@ -231,5 +236,6 @@ where
|
|||
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum Error {
|
||||
Terminated,
|
||||
NotImplemented,
|
||||
}
|
||||
|
|
|
@ -17,6 +17,8 @@ use proto::AgreementMessage;
|
|||
|
||||
// TODO: Make this a generic argument of `Broadcast`.
|
||||
type ProposedValue = Vec<u8>;
|
||||
// Type of output from the Common Subset message handler.
|
||||
type CommonSubsetOutput<NodeUid> = (Option<HashSet<ProposedValue>>, VecDeque<Output<NodeUid>>);
|
||||
|
||||
/// Input from a remote node to Common Subset.
|
||||
pub enum Input<NodeUid> {
|
||||
|
@ -27,9 +29,6 @@ pub enum Input<NodeUid> {
|
|||
}
|
||||
|
||||
/// Output from Common Subset to remote nodes.
|
||||
///
|
||||
/// FIXME: We can do an interface that doesn't need this type and instead works
|
||||
/// directly with the `TargetBroadcastMessage` and `AgreementMessage`.
|
||||
pub enum Output<NodeUid> {
|
||||
/// A broadcast message to be sent to the destination set in the
|
||||
/// `TargetedBroadcastMessage`.
|
||||
|
@ -46,6 +45,8 @@ pub struct CommonSubset<NodeUid: Eq + Hash + Ord> {
|
|||
broadcast_instances: HashMap<NodeUid, Broadcast<NodeUid>>,
|
||||
agreement_instances: HashMap<NodeUid, Agreement<NodeUid>>,
|
||||
broadcast_results: HashMap<NodeUid, ProposedValue>,
|
||||
/// FIXME: The result may be a set of bool rather than a single bool due to
|
||||
/// the ability of Agreement to output multiple values.
|
||||
agreement_results: HashMap<NodeUid, bool>,
|
||||
}
|
||||
|
||||
|
@ -104,10 +105,7 @@ impl<NodeUid: Clone + Debug + Display + Eq + Hash + Ord> CommonSubset<NodeUid> {
|
|||
|
||||
/// Upon delivery of v_j from RBC_j, if input has not yet been provided to
|
||||
/// BA_j, then provide input 1 to BA_j. See Figure 11.
|
||||
pub fn on_broadcast_result(
|
||||
&mut self,
|
||||
uid: &NodeUid,
|
||||
) -> Result<Option<AgreementMessage>, Error> {
|
||||
fn on_broadcast_result(&mut self, uid: &NodeUid) -> Result<Option<AgreementMessage>, Error> {
|
||||
if let Some(agreement_instance) = self.agreement_instances.get_mut(&uid) {
|
||||
if !agreement_instance.has_input() {
|
||||
Ok(Some(agreement_instance.set_input(true)))
|
||||
|
@ -119,20 +117,22 @@ impl<NodeUid: Clone + Debug + Display + Eq + Hash + Ord> CommonSubset<NodeUid> {
|
|||
}
|
||||
}
|
||||
|
||||
/// Receive input from a remote node.
|
||||
/// Receive input from a remote node. The output contains an optional result
|
||||
/// of the Common Subset algorithm - a set of proposed values - and a queue
|
||||
/// of messages to be sent to remote nodes, or an error.
|
||||
pub fn on_input(
|
||||
&mut self,
|
||||
message: Input<NodeUid>,
|
||||
) -> Result<VecDeque<Output<NodeUid>>, Error> {
|
||||
) -> Result<CommonSubsetOutput<NodeUid>, Error> {
|
||||
match message {
|
||||
Input::Broadcast(uid, bmessage) => {
|
||||
let mut instance_result = None;
|
||||
let input_result = {
|
||||
let input_result: Result<VecDeque<Output<NodeUid>>, Error> = {
|
||||
if let Some(broadcast_instance) = self.broadcast_instances.get(&uid) {
|
||||
broadcast_instance
|
||||
.handle_broadcast_message(&uid, bmessage)
|
||||
.map(|(value, queue)| {
|
||||
instance_result = value;
|
||||
.map(|(opt_value, queue)| {
|
||||
instance_result = opt_value;
|
||||
queue.into_iter().map(Output::Broadcast).collect()
|
||||
})
|
||||
.map_err(Error::from)
|
||||
|
@ -140,17 +140,24 @@ impl<NodeUid: Clone + Debug + Display + Eq + Hash + Ord> CommonSubset<NodeUid> {
|
|||
Err(Error::NoSuchBroadcastInstance)
|
||||
}
|
||||
};
|
||||
if instance_result.is_some() {
|
||||
self.on_broadcast_result(&uid)?;
|
||||
let mut opt_message: Option<AgreementMessage> = None;
|
||||
if let Some(value) = instance_result {
|
||||
self.broadcast_results.insert(uid.clone(), value);
|
||||
opt_message = self.on_broadcast_result(&uid)?;
|
||||
}
|
||||
input_result
|
||||
input_result.map(|mut queue| {
|
||||
if let Some(agreement_message) = opt_message {
|
||||
// Append the message to agreement nodes to the common output queue.
|
||||
queue.push_back(Output::Agreement(agreement_message))
|
||||
}
|
||||
(None, queue)
|
||||
})
|
||||
}
|
||||
|
||||
Input::Agreement(uid, amessage) => {
|
||||
// The result defaults to error.
|
||||
let mut result = Err(Error::NoSuchAgreementInstance);
|
||||
|
||||
// FIXME: send the message to the Agreement instance and
|
||||
if let Some(mut agreement_instance) = self.agreement_instances.get_mut(&uid) {
|
||||
// Optional output of agreement and outgoing agreement
|
||||
// messages to remote nodes.
|
||||
|
@ -158,6 +165,7 @@ impl<NodeUid: Clone + Debug + Display + Eq + Hash + Ord> CommonSubset<NodeUid> {
|
|||
// This instance has terminated and does not accept input.
|
||||
Ok((None, VecDeque::new()))
|
||||
} else {
|
||||
// Send the message to the agreement instance.
|
||||
agreement_instance
|
||||
.on_input(uid.clone(), &amessage)
|
||||
.map_err(Error::from)
|
||||
|
@ -168,11 +176,15 @@ impl<NodeUid: Clone + Debug + Display + Eq + Hash + Ord> CommonSubset<NodeUid> {
|
|||
if let Some(b) = output {
|
||||
outgoing.append(&mut self.on_agreement_result(uid, b));
|
||||
}
|
||||
Ok(outgoing.into_iter().map(Output::Agreement).collect())
|
||||
Ok((
|
||||
self.try_agreement_completion(),
|
||||
outgoing.into_iter().map(Output::Agreement).collect(),
|
||||
))
|
||||
} else {
|
||||
// error
|
||||
result
|
||||
.map(|(_, messages)| messages.into_iter().map(Output::Agreement).collect())
|
||||
result.map(|(_, messages)| {
|
||||
(None, messages.into_iter().map(Output::Agreement).collect())
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -180,25 +192,20 @@ impl<NodeUid: Clone + Debug + Display + Eq + Hash + Ord> CommonSubset<NodeUid> {
|
|||
|
||||
/// Callback to be invoked on receipt of a returned value of the Agreement
|
||||
/// instance `uid`.
|
||||
///
|
||||
/// FIXME: It is likely that only one `AgreementMessage` is required because
|
||||
/// Figure 11 does not count the number of messages but the number of nodes
|
||||
/// that sent messages.
|
||||
fn on_agreement_result(&mut self, uid: NodeUid, result: bool) -> VecDeque<AgreementMessage> {
|
||||
let mut outgoing = VecDeque::new();
|
||||
// Upon delivery of value 1 from at least N − f instances of BA, provide
|
||||
// input 0 to each instance of BA that has not yet been provided input.
|
||||
if result {
|
||||
self.agreement_results.insert(uid, result);
|
||||
let results1: Vec<bool> = self.agreement_results
|
||||
.iter()
|
||||
.map(|(_, v)| *v)
|
||||
.filter(|b| *b)
|
||||
.collect();
|
||||
// The number of instances of BA that output 1.
|
||||
let results1: usize =
|
||||
self.agreement_results
|
||||
.iter()
|
||||
.fold(0, |count, (_, v)| if *v { count + 1 } else { count });
|
||||
|
||||
if results1.len() >= self.num_nodes - self.num_faulty_nodes {
|
||||
let instances = &mut self.agreement_instances;
|
||||
for (_uid0, instance) in instances.iter_mut() {
|
||||
if results1 >= self.num_nodes - self.num_faulty_nodes {
|
||||
for instance in self.agreement_instances.values_mut() {
|
||||
if !instance.has_input() {
|
||||
outgoing.push_back(instance.set_input(false));
|
||||
}
|
||||
|
@ -208,24 +215,19 @@ impl<NodeUid: Clone + Debug + Display + Eq + Hash + Ord> CommonSubset<NodeUid> {
|
|||
outgoing
|
||||
}
|
||||
|
||||
pub fn on_agreement_completion(&self) -> Option<HashSet<ProposedValue>> {
|
||||
fn try_agreement_completion(&self) -> Option<HashSet<ProposedValue>> {
|
||||
// Once all instances of BA have completed, let C ⊂ [1..N] be
|
||||
// the indexes of each BA that delivered 1. Wait for the output
|
||||
// v_j for each RBC_j such that j∈C. Finally output ∪ j∈C v_j.
|
||||
let instance_uids: HashSet<NodeUid> = self.agreement_instances
|
||||
if self.agreement_instances
|
||||
.iter()
|
||||
.map(|(k, _)| k.clone())
|
||||
.collect();
|
||||
let completed_uids: HashSet<NodeUid> = self.agreement_results
|
||||
.iter()
|
||||
.map(|(k, _)| k.clone())
|
||||
.collect();
|
||||
if instance_uids == completed_uids {
|
||||
// All instances of Agreement that delivered `true`.
|
||||
let delivered_1: HashSet<NodeUid> = self.agreement_results
|
||||
.all(|(_, instance)| instance.terminated())
|
||||
{
|
||||
// All instances of Agreement that delivered `true` (or "1" in the paper).
|
||||
let delivered_1: HashSet<&NodeUid> = self.agreement_results
|
||||
.iter()
|
||||
.filter(|(_, v)| **v)
|
||||
.map(|(k, _)| k.clone())
|
||||
.map(|(k, _)| k)
|
||||
.collect();
|
||||
// Results of Broadcast instances in `delivered_1`
|
||||
let broadcast_results: HashSet<ProposedValue> = self.broadcast_results
|
||||
|
|
Loading…
Reference in New Issue