diff --git a/proto/message.proto b/proto/message.proto index ff5bfd7..92ae378 100644 --- a/proto/message.proto +++ b/proto/message.proto @@ -45,8 +45,9 @@ message LemmaProto { } message AgreementProto { + uint32 epoch = 1; oneof payload { - bool bval = 1; - bool aux = 2; + bool bval = 2; + bool aux = 3; } } \ No newline at end of file diff --git a/src/agreement.rs b/src/agreement.rs index 0c733be..95b7655 100644 --- a/src/agreement.rs +++ b/src/agreement.rs @@ -1,26 +1,61 @@ //! Binary Byzantine agreement protocol from a common coin protocol. -use proto::AgreementMessage; -use std::collections::{BTreeSet, VecDeque}; +use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque}; +use std::hash::Hash; -#[derive(Default)] -pub struct Agreement { +use proto::AgreementMessage; + +pub struct Agreement { + /// The UID of the corresponding node. + uid: NodeUid, + num_nodes: usize, + num_faulty_nodes: usize, + epoch: u32, input: Option, - _bin_values: BTreeSet, + /// Bin values. Reset on every epoch update. + bin_values: BTreeSet, + /// Values received in BVAL messages. Reset on every epoch update. + received_bval: HashMap>, + /// Sent BVAL values. Reset on every epoch update. + sent_bval: BTreeSet, + /// Values received in AUX messages. Reset on every epoch update. + received_aux: HashMap>, + /// All the output values in all epochs. + outputs: BTreeMap, + /// Termination flag. + terminated: bool, } -impl Agreement { - pub fn new() -> Self { +impl Agreement { + pub fn new(uid: NodeUid, num_nodes: usize) -> Self { + let num_faulty_nodes = (num_nodes - 1) / 3; + Agreement { + uid, + num_nodes, + num_faulty_nodes, + epoch: 0, input: None, - _bin_values: BTreeSet::new(), + bin_values: BTreeSet::new(), + received_bval: HashMap::new(), + sent_bval: BTreeSet::new(), + received_aux: HashMap::new(), + outputs: BTreeMap::new(), + terminated: false, } } + /// Algorithm has terminated. + pub fn terminated(&self) -> bool { + self.terminated + } + pub fn set_input(&mut self, input: bool) -> AgreementMessage { self.input = Some(input); + // Receive the BVAL message locally. + update_map_of_sets(&mut self.received_bval, self.uid.clone(), input); // Multicast BVAL - AgreementMessage::BVal(input) + AgreementMessage::BVal((self.epoch, input)) } pub fn has_input(&self) -> bool { @@ -28,12 +63,170 @@ impl Agreement { } /// Receive input from a remote node. + /// + /// Outputs an optional agreement result and a queue of agreement messages + /// to remote nodes. There can be up to 2 messages. pub fn on_input( - &self, - _message: &AgreementMessage, - ) -> Result, Error> { - Err(Error::NotImplemented) + &mut self, + uid: NodeUid, + message: &AgreementMessage, + ) -> Result<(Option, VecDeque), Error> { + let mut outgoing = VecDeque::new(); + + match *message { + AgreementMessage::BVal((epoch, b)) if epoch == self.epoch => { + update_map_of_sets(&mut self.received_bval, uid, b); + let count_bval = self.received_bval.iter().fold(0, |count, (_, values)| { + if values.contains(&b) { + count + 1 + } else { + count + } + }); + + // upon receiving BVAL_r(b) messages from 2f + 1 nodes, + // bin_values_r := bin_values_r ∪ {b} + if count_bval == 2 * self.num_faulty_nodes + 1 { + self.bin_values.insert(b); + + // wait until bin_values_r /= 0, then multicast AUX_r(w) + // where w ∈ bin_values_r + outgoing.push_back(AgreementMessage::Aux((self.epoch, b))); + // Receive the AUX message locally. + update_map_of_sets(&mut self.received_aux, self.uid.clone(), b); + + let coin_result = self.try_coin(); + if let Some(output_message) = coin_result.1 { + outgoing.push_back(output_message); + } + Ok((coin_result.0, outgoing)) + } + // upon receiving BVAL_r(b) messages from f + 1 nodes, if + // BVAL_r(b) has not been sent, multicast BVAL_r(b) + else if count_bval == self.num_faulty_nodes + 1 && !self.sent_bval.contains(&b) { + outgoing.push_back(AgreementMessage::BVal((self.epoch, b))); + // Receive the BVAL message locally. + update_map_of_sets(&mut self.received_bval, self.uid.clone(), b); + Ok((None, outgoing)) + } else { + Ok((None, outgoing)) + } + } + + AgreementMessage::Aux((_epoch, b)) => { + update_map_of_sets(&mut self.received_aux, uid, b); + if !self.bin_values.is_empty() { + let coin_result = self.try_coin(); + if let Some(output_message) = coin_result.1 { + outgoing.push_back(output_message); + } + Ok((coin_result.0, outgoing)) + } else { + Ok((None, outgoing)) + } + } + + _ => { + // Epoch does not match. Ignore the message. + Ok((None, outgoing)) + } + } } + + /// AUX_r messages such that the set of values carried by those messages is + /// a subset of bin_values_r. Outputs this subset. + /// + /// FIXME: Clarify whether the values of AUX messages should be the same or + /// not. It is assumed in `count_aux` that they can differ. + fn count_aux(&self) -> (usize, BTreeSet) { + let vals = BTreeSet::new(); + ( + self.received_aux.iter().fold(0, |count, (_, values)| { + if values.is_subset(&self.bin_values) { + vals.union(values); + count + 1 + } else { + count + } + }), + vals, + ) + } + + /// Wait until at least (N − f) AUX_r messages have been received, such that + /// the set of values carried by these messages, vals, are a subset of + /// bin_values_r (note that bin_values_r may continue to change as BVAL_r + /// messages are received, thus this condition may be triggered upon arrival + /// of either an AUX_r or a BVAL_r message). + /// + /// `try_coin` output an optional combination of the agreement value and the + /// agreement broadcast message. + fn try_coin(&mut self) -> (Option, Option) { + let (count_aux, vals) = self.count_aux(); + if count_aux >= self.num_nodes - self.num_faulty_nodes { + // FIXME: Implement the Common Coin algorithm. At the moment the + // coin value is constant `true`. + let coin: u64 = 1; + + let coin2 = coin % 2 != 0; + + // Check the termination condition: "continue looping until both a + // value b is output in some round r, and the value Coin_r' = b for + // some round r' > r." + self.terminated = self.terminated || self.outputs.values().any(|b| *b == coin2); + + // Prepare to start the next epoch. + self.bin_values.clear(); + + if vals.len() == 1 { + let mut message = None; + // NOTE: `vals` has exactly one element due to `vals.len() == 1` + let output: Vec> = vals.into_iter() + .take(1) + .map(|b| { + message = Some(self.set_input(b)); + + if b == coin2 { + // Record the output to perform a termination check later. + self.outputs.insert(self.epoch, b); + // Output the agreement value. + Some(b) + } else { + // Don't output a value. + None + } + }) + .collect(); + // Start the next epoch. + self.epoch += 1; + (output[0], message) + } else { + // Start the next epoch. + self.epoch += 1; + (None, Some(self.set_input(coin2))) + } + } else { + // Continue waiting for the (N - f) AUX messages. + (None, None) + } + } +} + +// Insert an element into a hash map of sets of values of type `Elt`. +fn update_map_of_sets(map: &mut HashMap>, key: Key, elt: Elt) +where + Key: Eq + Hash, + Elt: Copy + Ord, +{ + map.entry(key) + .and_modify(|values| { + values.insert(elt); + }) + .or_insert({ + let mut values = BTreeSet::new(); + values.insert(elt); + values + }); } #[derive(Clone, Debug)] diff --git a/src/common_subset.rs b/src/common_subset.rs index d92c5d4..55f85e0 100644 --- a/src/common_subset.rs +++ b/src/common_subset.rs @@ -43,15 +43,15 @@ pub struct CommonSubset { uid: NodeUid, num_nodes: usize, num_faulty_nodes: usize, - agreement_true_outputs: HashSet, broadcast_instances: HashMap>, - agreement_instances: HashMap, + agreement_instances: HashMap>, broadcast_results: HashMap, agreement_results: HashMap, } impl CommonSubset { - pub fn new(uid: NodeUid, all_uids: &HashSet, num_nodes: usize) -> Result { + pub fn new(uid: NodeUid, all_uids: &HashSet) -> Result { + let num_nodes = all_uids.len(); let num_faulty_nodes = (num_nodes - 1) / 3; // Create all broadcast instances. @@ -68,18 +68,17 @@ impl CommonSubset { } // Create all agreement instances. - let mut agreement_instances: HashMap = HashMap::new(); + let mut agreement_instances: HashMap> = HashMap::new(); for uid0 in all_uids { - agreement_instances.insert(uid0.clone(), Agreement::new()); + agreement_instances.insert(uid0.clone(), Agreement::new(uid0.clone(), num_nodes)); } Ok(CommonSubset { uid, num_nodes, num_faulty_nodes, - agreement_true_outputs: HashSet::new(), broadcast_instances, - agreement_instances: HashMap::new(), + agreement_instances, broadcast_results: HashMap::new(), agreement_results: HashMap::new(), }) @@ -109,7 +108,7 @@ impl CommonSubset { &mut self, uid: &NodeUid, ) -> Result, Error> { - if let Some(agreement_instance) = self.agreement_instances.get_mut(uid) { + if let Some(agreement_instance) = self.agreement_instances.get_mut(&uid) { if !agreement_instance.has_input() { Ok(Some(agreement_instance.set_input(true))) } else { @@ -146,11 +145,35 @@ impl CommonSubset { } input_result } - Input::Agreement(_uid, _message) => { - // FIXME: send the message to the Agreement instance and - // conditionally call `on_agreement_output` - Err(Error::NotImplemented) + Input::Agreement(uid, amessage) => { + // The result defaults to error. + let mut result = Err(Error::NoSuchAgreementInstance); + + // FIXME: send the message to the Agreement instance and + if let Some(mut agreement_instance) = self.agreement_instances.get_mut(&uid) { + // Optional output of agreement and outgoing agreement + // messages to remote nodes. + result = if agreement_instance.terminated() { + // This instance has terminated and does not accept input. + Ok((None, VecDeque::new())) + } else { + agreement_instance + .on_input(uid.clone(), &amessage) + .map_err(Error::from) + } + } + + if let Ok((output, mut outgoing)) = result { + if let Some(b) = output { + outgoing.append(&mut self.on_agreement_result(uid, b)); + } + Ok(outgoing.into_iter().map(Output::Agreement).collect()) + } else { + // error + result + .map(|(_, messages)| messages.into_iter().map(Output::Agreement).collect()) + } } } } @@ -166,9 +189,14 @@ impl CommonSubset { // Upon delivery of value 1 from at least N − f instances of BA, provide // input 0 to each instance of BA that has not yet been provided input. if result { - self.agreement_true_outputs.insert(uid); + self.agreement_results.insert(uid, result); + let results1: Vec = self.agreement_results + .iter() + .map(|(_, v)| *v) + .filter(|b| *b) + .collect(); - if self.agreement_true_outputs.len() >= self.num_nodes - self.num_faulty_nodes { + if results1.len() >= self.num_nodes - self.num_faulty_nodes { let instances = &mut self.agreement_instances; for (_uid0, instance) in instances.iter_mut() { if !instance.has_input() { @@ -222,6 +250,7 @@ pub enum Error { UnexpectedMessage, NotImplemented, NoSuchBroadcastInstance, + NoSuchAgreementInstance, Broadcast(broadcast::Error), Agreement(agreement::Error), } diff --git a/src/proto/mod.rs b/src/proto/mod.rs index ba5e2a0..7163d8d 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -67,10 +67,12 @@ impl<'a, T: AsRef<[u8]>> fmt::Debug for HexProof<'a, T> { } /// Messages sent during the binary Byzantine agreement stage. -#[derive(Copy, Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] pub enum AgreementMessage { - BVal(bool), - Aux(bool), + /// BVAL message with an epoch. + BVal((u32, bool)), + /// AUX message with an epoch. + Aux((u32, bool)), } impl + From>> Message { @@ -175,8 +177,14 @@ impl AgreementMessage { pub fn into_proto(self) -> AgreementProto { let mut p = AgreementProto::new(); match self { - AgreementMessage::BVal(b) => p.set_bval(b), - AgreementMessage::Aux(b) => p.set_aux(b), + AgreementMessage::BVal((e, b)) => { + p.set_epoch(e); + p.set_bval(b); + } + AgreementMessage::Aux((e, b)) => { + p.set_epoch(e); + p.set_aux(b); + } } p } @@ -184,10 +192,11 @@ impl AgreementMessage { // TODO: Re-enable lint once implemented. #[cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))] pub fn from_proto(mp: AgreementProto) -> Option { + let epoch = mp.get_epoch(); if mp.has_bval() { - Some(AgreementMessage::BVal(mp.get_bval())) + Some(AgreementMessage::BVal((epoch, mp.get_bval()))) } else if mp.has_aux() { - Some(AgreementMessage::Aux(mp.get_aux())) + Some(AgreementMessage::Aux((epoch, mp.get_aux()))) } else { None }