use std::collections::BTreeMap; use std::sync::Arc; use std::{fmt, result}; use crate::crypto::SignatureShare; use bincode; use log::debug; use rand::Rng; use super::bool_multimap::BoolMultimap; use super::bool_set::{self, BoolSet}; use super::sbv_broadcast::{self, Message as SbvMessage, SbvBroadcast}; use super::{Error, FaultKind, Message, MessageContent, Result, Step}; use crate::fault_log::Fault; use crate::threshold_sign::{self, Message as TsMessage, ThresholdSign}; use crate::{ConsensusProtocol, NetworkInfo, NodeIdT, SessionIdT, Target}; /// The state of the current epoch's coin. In some epochs this is fixed, in others it starts /// with in `InProgress`. #[derive(Debug)] enum CoinState { /// The value was fixed in the current epoch, or the coin has already terminated. Decided(bool), /// The coin value is not known yet. InProgress(Box>), } impl CoinState { /// Returns the value, if this coin has already decided. fn value(&self) -> Option { match self { CoinState::Decided(value) => Some(*value), CoinState::InProgress(_) => None, } } } impl From for CoinState { fn from(value: bool) -> Self { CoinState::Decided(value) } } /// Binary Agreeement messages received from other nodes for a particular Binary Agreement epoch. #[derive(Debug)] struct ReceivedMessages { /// Received `BVal` messages. bval: BoolSet, /// Received `Aux` messages. aux: BoolSet, /// Received `Conf` message. conf: Option, /// Received `Term` message. term: Option, /// Received `Coin` message, namely its `SignatureShare`. coin: Option, } impl ReceivedMessages { fn new() -> Self { ReceivedMessages { bval: bool_set::NONE, aux: bool_set::NONE, conf: None, term: None, coin: None, } } /// Inserts new message content if it is accepted or returns a `FaultKind` indicating an issue /// that prevented insertion of that new content. fn insert(&mut self, content: MessageContent) -> Option { match content { MessageContent::SbvBroadcast(sbv) => match sbv { sbv_broadcast::Message::BVal(b) => { if !self.bval.insert(b) { return Some(FaultKind::DuplicateBVal); } } sbv_broadcast::Message::Aux(b) => { if !self.aux.insert(b) { return Some(FaultKind::DuplicateAux); } } }, MessageContent::Conf(bs) => { if self.conf.is_none() { self.conf = Some(bs); } else { return Some(FaultKind::MultipleConf); } } MessageContent::Term(b) => { if self.term.is_none() { self.term = Some(b); } else { return Some(FaultKind::MultipleTerm); } } MessageContent::Coin(msg) => { if self.coin.is_none() { self.coin = Some(msg.0); } else { return Some(FaultKind::AgreementEpoch); } } } None } /// Creates message content from `ReceivedMessages`. That message content can then be handled. fn messages(self) -> Vec { let ReceivedMessages { bval, aux, conf, term, coin, } = self; let mut messages = Vec::new(); for b in bval { messages.push(MessageContent::SbvBroadcast(SbvMessage::BVal(b))); } for b in aux { messages.push(MessageContent::SbvBroadcast(SbvMessage::Aux(b))); } if let Some(bs) = conf { messages.push(MessageContent::Conf(bs)); } if let Some(b) = term { messages.push(MessageContent::Term(b)); } if let Some(ss) = coin { messages.push(MessageContent::Coin(Box::new(TsMessage(ss)))); } messages } } /// Binary Agreement instance #[derive(Debug)] pub struct BinaryAgreement { /// Shared network information. netinfo: Arc>, /// Session identifier, to prevent replaying messages in other instances. session_id: S, /// Binary Agreement algorithm epoch. epoch: u64, /// Maximum number of future epochs for which incoming messages are accepted. max_future_epochs: u64, /// This epoch's Synchronized Binary Value Broadcast instance. sbv_broadcast: SbvBroadcast, /// Received `Conf` messages. Reset on every epoch update. received_conf: BTreeMap, /// Received `Term` messages. Kept throughout epoch updates. These count as `BVal`, `Aux` and /// `Conf` messages for all future epochs. received_term: BoolMultimap, /// The estimate of the decision value in the current epoch. estimated: Option, /// A permanent, latching copy of the output value. This copy is required because `output` can /// be consumed using `ConsensusProtocol::next_output` immediately after the instance finishing to /// handle a message, in which case it would otherwise be unknown whether the output value was /// ever there at all. While the output value will still be required in a later epoch to decide /// the termination state. decision: Option, /// A cache for messages for future epochs that cannot be handled yet. incoming_queue: BTreeMap>, /// The values we found in the first _N - f_ `Aux` messages that were in `bin_values`. conf_values: Option, /// The state of this epoch's coin. coin_state: CoinState, } impl ConsensusProtocol for BinaryAgreement { type NodeId = N; type Input = bool; type Output = bool; type Message = Message; type Error = Error; type FaultKind = FaultKind; fn handle_input(&mut self, input: Self::Input, _rng: &mut R) -> Result> { self.propose(input) } /// Receive input from a remote node. fn handle_message( &mut self, sender_id: &Self::NodeId, message: Message, _rng: &mut R, ) -> Result> { self.handle_message(sender_id, message) } /// Whether the algorithm has terminated. fn terminated(&self) -> bool { self.decision.is_some() } fn our_id(&self) -> &Self::NodeId { self.netinfo.our_id() } } impl BinaryAgreement { /// Creates a new `BinaryAgreement` instance with the given session identifier, to prevent /// replaying messages in other instances. pub fn new(netinfo: Arc>, session_id: S) -> Result { Ok(BinaryAgreement { netinfo: netinfo.clone(), session_id, epoch: 0, max_future_epochs: 1000, sbv_broadcast: SbvBroadcast::new(netinfo), received_conf: BTreeMap::new(), received_term: BoolMultimap::default(), estimated: None, decision: None, incoming_queue: BTreeMap::new(), conf_values: None, coin_state: CoinState::Decided(true), }) } /// Proposes a boolean value for Binary Agreement. /// /// If more than two thirds of validators propose the same value, that will eventually be /// output. Otherwise either output is possible. /// /// Note that if `can_propose` returns `false`, it is already too late to affect the outcome. pub fn propose(&mut self, input: bool) -> Result> { if !self.can_propose() { return Ok(Step::default()); } // Set the initial estimated value to the input value. self.estimated = Some(input); let sbvb_step = self.sbv_broadcast.send_bval(input)?; self.handle_sbvb_step(sbvb_step) } /// Handles a message received from `sender_id`. /// /// This must be called with every message we receive from another node. pub fn handle_message(&mut self, sender_id: &N, msg: Message) -> Result> { let Message { epoch, content } = msg; if self.decision.is_some() || (epoch < self.epoch && content.can_expire()) { // Message is obsolete: We are already in a later epoch or terminated. Ok(Step::default()) } else if epoch > self.epoch + self.max_future_epochs { Ok(Fault::new(sender_id.clone(), FaultKind::AgreementEpoch).into()) } else if epoch > self.epoch { // Message is for a later epoch. We can't handle that yet. let epoch_state = self .incoming_queue .entry(epoch) .or_insert_with(BTreeMap::new); let received = epoch_state .entry(sender_id.clone()) .or_insert_with(ReceivedMessages::new); Ok(received .insert(content) .map_or_else(Step::default, |fault| { Fault::new(sender_id.clone(), fault).into() })) } else { self.handle_message_content(sender_id, content) } } /// Whether we can still input a value. It is not an error to input if this returns `false`, /// but it will have no effect on the outcome. pub fn can_propose(&self) -> bool { self.epoch == 0 && self.estimated.is_none() } /// Dispatches the message content to the corresponding handling method. fn handle_message_content( &mut self, sender_id: &N, content: MessageContent, ) -> Result> { match content { MessageContent::SbvBroadcast(msg) => self.handle_sbv_broadcast(sender_id, &msg), MessageContent::Conf(v) => self.handle_conf(sender_id, v), MessageContent::Term(v) => self.handle_term(sender_id, v), MessageContent::Coin(msg) => self.handle_coin(sender_id, *msg), } } /// Handles a Synchroniced Binary Value Broadcast message. fn handle_sbv_broadcast( &mut self, sender_id: &N, msg: &sbv_broadcast::Message, ) -> Result> { let sbvb_step = self.sbv_broadcast.handle_message(sender_id, &msg)?; self.handle_sbvb_step(sbvb_step) } /// Handles a Synchronized Binary Value Broadcast step. On output, starts the `Conf` round or /// decides. fn handle_sbvb_step(&mut self, sbvb_step: sbv_broadcast::Step) -> Result> { let mut step = Step::default(); let output = step.extend_with( sbvb_step, |fault| fault, |msg| MessageContent::SbvBroadcast(msg).with_epoch(self.epoch), ); if self.conf_values.is_some() { return Ok(step); // The `Conf` round has already started. } if let Some(aux_vals) = output.into_iter().next() { // Execute the Coin schedule `false, true, get_coin(), false, true, get_coin(), ...` match self.coin_state { CoinState::Decided(_) => { self.conf_values = Some(aux_vals); step.extend(self.try_update_epoch()?) } CoinState::InProgress(_) => { // Start the `Conf` message round. step.extend(self.send_conf(aux_vals)?) } } } Ok(step) } /// Handles a `Conf` message. When _N - f_ `Conf` messages with values in `bin_values` have /// been received, updates the epoch or decides. fn handle_conf(&mut self, sender_id: &N, v: BoolSet) -> Result> { self.received_conf.insert(sender_id.clone(), v); self.try_finish_conf_round() } /// Handles a `Term(v)` message. If we haven't yet decided on a value and there are more than /// _f_ such messages with the same value from different nodes, performs expedite termination: /// decides on `v`, broadcasts `Term(v)` and terminates the instance. fn handle_term(&mut self, sender_id: &N, b: bool) -> Result> { self.received_term[b].insert(sender_id.clone()); // Check for the expedite termination condition. if self.decision.is_some() { Ok(Step::default()) } else if self.received_term[b].len() > self.netinfo.num_faulty() { Ok(self.decide(b)) } else { // Otherwise handle the `Term` as a `BVal`, `Aux` and `Conf`. let mut sbvb_step = self.sbv_broadcast.handle_bval(sender_id, b)?; sbvb_step.extend(self.sbv_broadcast.handle_aux(sender_id, b)?); let step = self.handle_sbvb_step(sbvb_step)?; Ok(step.join(self.handle_conf(sender_id, BoolSet::from(b))?)) } } /// Handles a `ThresholdSign` message. If there is output, starts the next epoch. The function /// may output a decision value. fn handle_coin(&mut self, sender_id: &N, msg: threshold_sign::Message) -> Result> { let ts_step = match self.coin_state { CoinState::Decided(_) => return Ok(Step::default()), // Coin value is already decided. CoinState::InProgress(ref mut ts) => ts .handle_message(sender_id, msg) .map_err(Error::HandleThresholdSign)?, }; self.on_coin_step(ts_step) } /// Multicasts a `Conf(values)` message, and handles it. fn send_conf(&mut self, values: BoolSet) -> Result> { if self.conf_values.is_some() { // Only one `Conf` message is allowed in an epoch. return Ok(Step::default()); } // Trigger the start of the `Conf` round. self.conf_values = Some(values); if !self.netinfo.is_validator() { return Ok(self.try_finish_conf_round()?); } self.send(MessageContent::Conf(values)) } /// Multicasts and handles a message. Does nothing if we are only an observer. fn send(&mut self, content: MessageContent) -> Result> { if !self.netinfo.is_validator() { return Ok(Step::default()); } let step: Step = Target::all() .message(content.clone().with_epoch(self.epoch)) .into(); let our_id = &self.our_id().clone(); Ok(step.join(self.handle_message_content(our_id, content)?)) } /// Handles a step returned from the `ThresholdSign`. fn on_coin_step(&mut self, ts_step: threshold_sign::Step) -> Result> { let mut step = Step::default(); let epoch = self.epoch; let to_msg = |c_msg| MessageContent::Coin(Box::new(c_msg)).with_epoch(epoch); let ts_output = step.extend_with(ts_step, FaultKind::CoinFault, to_msg); if let Some(sig) = ts_output.into_iter().next() { // Take the parity of the signature as the coin value. self.coin_state = sig.parity().into(); step.extend(self.try_update_epoch()?); } Ok(step) } /// If this epoch's coin value or conf values are not known yet, does nothing, otherwise /// updates the epoch or decides. /// /// With two conf values, the next epoch's estimate is the coin value. If there is only one conf /// value and that disagrees with the coin, the conf value is the next epoch's estimate. If /// the unique conf value agrees with the coin, terminates and decides on that value. fn try_update_epoch(&mut self) -> Result> { if self.decision.is_some() { // Avoid an infinite regression without making a Binary Agreement step. return Ok(Step::default()); } let coin = match self.coin_state.value() { None => return Ok(Step::default()), // Still waiting for coin value. Some(coin) => coin, }; let def_bin_value = match self.conf_values { None => return Ok(Step::default()), // Still waiting for conf value. Some(ref values) => values.definite(), }; if Some(coin) == def_bin_value { Ok(self.decide(coin)) } else { self.update_epoch(def_bin_value.unwrap_or(coin)) } } /// Creates the initial coin state for the current epoch, i.e. sets it to the predetermined /// value, or initializes a `ThresholdSign` instance. fn coin_state(&self) -> Result> { match self.epoch % 3 { 0 => Ok(CoinState::Decided(true)), 1 => Ok(CoinState::Decided(false)), _ => { let coin_id = bincode::serialize(&(&self.session_id, self.epoch))?; let mut ts = ThresholdSign::new(self.netinfo.clone()); ts.set_document(coin_id).map_err(Error::InvokeCoin)?; Ok(CoinState::InProgress(Box::new(ts))) } } } /// Decides on a value and broadcasts a `Term` message with that value. fn decide(&mut self, b: bool) -> Step { if self.decision.is_some() { return Step::default(); } // Output the Binary Agreement value. let mut step = Step::default(); step.output.push(b); // Latch the decided state. self.decision = Some(b); debug!("{}: decision: {}", self, b); if self.netinfo.is_validator() { let msg = MessageContent::Term(b).with_epoch(self.epoch + 1); step.messages.push(Target::all().message(msg)); } step } /// Checks whether the _N - f_ `Conf` messages have arrived, and if so, activates the coin. fn try_finish_conf_round(&mut self) -> Result> { if self.conf_values.is_none() || self.count_conf() < self.netinfo.num_correct() { return Ok(Step::default()); } // Invoke the coin. let ts_step = match self.coin_state { CoinState::Decided(_) => return Ok(Step::default()), // Coin has already decided. CoinState::InProgress(ref mut ts) => ts.sign().map_err(Error::InvokeCoin)?, }; Ok(self.on_coin_step(ts_step)?.join(self.try_update_epoch()?)) } /// Counts the number of received `Conf` messages with values in `bin_values`. fn count_conf(&self) -> usize { let is_bin_val = |conf: &&BoolSet| conf.is_subset(self.sbv_broadcast.bin_values()); self.received_conf.values().filter(is_bin_val).count() } /// Increments the epoch, sets the new estimate and handles queued messages. fn update_epoch(&mut self, b: bool) -> Result> { self.sbv_broadcast.clear(&self.received_term); self.received_conf.clear(); for (v, id) in &self.received_term { self.received_conf.insert(id.clone(), BoolSet::from(v)); } self.conf_values = None; self.epoch += 1; self.coin_state = self.coin_state()?; debug!( "{}: epoch started, {} terminated", self, self.received_conf.len(), ); self.estimated = Some(b); let sbvb_step = self.sbv_broadcast.send_bval(b)?; let mut step = self.handle_sbvb_step(sbvb_step)?; let epoch = self.epoch; let epoch_state = self.incoming_queue.remove(&epoch).into_iter().flatten(); for (sender_id, received) in epoch_state { for m in received.messages() { step.extend(self.handle_message_content(&sender_id, m)?); if self.decision.is_some() || self.epoch > epoch { return Ok(step); } } } Ok(step) } } impl fmt::Display for BinaryAgreement { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> result::Result<(), fmt::Error> { write!( f, "{:?} BA {} epoch {} ({})", self.our_id(), self.session_id, self.epoch, if self.netinfo.is_validator() { "validator" } else { "observer" } ) } }