Make the incoming message queue finite in Binary Agreement (#329)

* added BA max_future_epochs and limited incoming messages in an epoch

* corrected a comment
This commit is contained in:
Vladimir Komendantskiy 2018-11-12 11:15:02 +00:00 committed by GitHub
parent b3c63774a7
commit e4435d5622
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 136 additions and 19 deletions

View File

@ -3,13 +3,15 @@ use std::sync::Arc;
use std::{fmt, result};
use bincode;
use crypto::SignatureShare;
use log::debug;
use super::bool_multimap::BoolMultimap;
use super::bool_set::BoolSet;
use super::sbv_broadcast::{self, SbvBroadcast};
use super::bool_set::{self, BoolSet};
use super::sbv_broadcast::{self, Message as SbvMessage, SbvBroadcast};
use super::{Error, Message, MessageContent, Result, Step};
use threshold_sign::{self, ThresholdSign};
use fault_log::{Fault, FaultKind};
use threshold_sign::{self, Message as TsMessage, ThresholdSign};
use {DistAlgorithm, NetworkInfo, NodeIdT, SessionIdT, Target};
/// The state of the current epoch's coin. In some epochs this is fixed, in others it starts
@ -38,6 +40,102 @@ impl<N> From<bool> for CoinState<N> {
}
}
/// Binary Agreeement messages received from other nodes for a particular Binary Agreement epoch.
#[derive(Debug)]
struct ReceivedMessages {
/// Received `BVal` messages.
bval: BoolSet,
/// Received `Aux` messages.
aux: BoolSet,
/// Received `Conf` message.
conf: Option<BoolSet>,
/// Received `Term` message.
term: Option<bool>,
/// Received `Coin` message, namely its `SignatureShare`.
coin: Option<SignatureShare>,
}
impl ReceivedMessages {
fn new() -> Self {
ReceivedMessages {
bval: bool_set::NONE,
aux: bool_set::NONE,
conf: None,
term: None,
coin: None,
}
}
/// Inserts new message content if it is accepted or returns a `FaultKind` indicating an issue
/// that prevented insertion of that new content.
fn insert(&mut self, content: MessageContent) -> Option<FaultKind> {
match content {
MessageContent::SbvBroadcast(sbv) => match sbv {
sbv_broadcast::Message::BVal(b) => {
if !self.bval.insert(b) {
return Some(FaultKind::DuplicateBVal);
}
}
sbv_broadcast::Message::Aux(b) => {
if !self.aux.insert(b) {
return Some(FaultKind::DuplicateAux);
}
}
},
MessageContent::Conf(bs) => {
if self.conf.is_none() {
self.conf = Some(bs);
} else {
return Some(FaultKind::MultipleConf);
}
}
MessageContent::Term(b) => {
if self.term.is_none() {
self.term = Some(b);
} else {
return Some(FaultKind::MultipleTerm);
}
}
MessageContent::Coin(msg) => {
if self.coin.is_none() {
self.coin = Some(msg.to_sig().clone());
} else {
return Some(FaultKind::AgreementEpoch);
}
}
}
None
}
/// Creates message content from `ReceivedMessages`. That message content can then be handled.
fn messages(self) -> Vec<MessageContent> {
let ReceivedMessages {
bval,
aux,
conf,
term,
coin,
} = self;
let mut messages = Vec::new();
for b in bval {
messages.push(MessageContent::SbvBroadcast(SbvMessage::BVal(b)));
}
for b in aux {
messages.push(MessageContent::SbvBroadcast(SbvMessage::Aux(b)));
}
if let Some(bs) = conf {
messages.push(MessageContent::Conf(bs));
}
if let Some(b) = term {
messages.push(MessageContent::Term(b));
}
if let Some(ss) = coin {
messages.push(MessageContent::Coin(Box::new(TsMessage::new(ss))));
}
messages
}
}
/// Binary Agreement instance
#[derive(Debug)]
pub struct BinaryAgreement<N, S> {
@ -46,7 +144,9 @@ pub struct BinaryAgreement<N, S> {
/// Session identifier, to prevent replaying messages in other instances.
session_id: S,
/// Binary Agreement algorithm epoch.
epoch: u32,
epoch: u64,
/// Maximum number of future epochs for which incoming messages are accepted.
max_future_epochs: u64,
/// This epoch's Synchronized Binary Value Broadcast instance.
sbv_broadcast: SbvBroadcast<N>,
/// Received `Conf` messages. Reset on every epoch update.
@ -63,8 +163,7 @@ pub struct BinaryAgreement<N, S> {
/// the termination state.
decision: Option<bool>,
/// A cache for messages for future epochs that cannot be handled yet.
// TODO: Find a better solution for this; defend against spam.
incoming_queue: BTreeMap<u32, Vec<(N, MessageContent)>>,
incoming_queue: BTreeMap<u64, BTreeMap<N, ReceivedMessages>>,
/// The values we found in the first _N - f_ `Aux` messages that were in `bin_values`.
conf_values: Option<BoolSet>,
/// The state of this epoch's coin.
@ -105,6 +204,7 @@ impl<N: NodeIdT, S: SessionIdT> BinaryAgreement<N, S> {
netinfo: netinfo.clone(),
session_id,
epoch: 0,
max_future_epochs: 1000,
sbv_broadcast: SbvBroadcast::new(netinfo),
received_conf: BTreeMap::new(),
received_term: BoolMultimap::default(),
@ -140,11 +240,20 @@ impl<N: NodeIdT, S: SessionIdT> BinaryAgreement<N, S> {
if self.decision.is_some() || (epoch < self.epoch && content.can_expire()) {
// Message is obsolete: We are already in a later epoch or terminated.
Ok(Step::default())
} else if epoch > self.epoch + self.max_future_epochs {
Ok(Fault::new(sender_id.clone(), FaultKind::AgreementEpoch).into())
} else if epoch > self.epoch {
// Message is for a later epoch. We can't handle that yet.
let queue = self.incoming_queue.entry(epoch).or_insert_with(Vec::new);
queue.push((sender_id.clone(), content));
Ok(Step::default())
let epoch_state = self
.incoming_queue
.entry(epoch)
.or_insert_with(BTreeMap::new);
let received = epoch_state
.entry(sender_id.clone())
.or_insert_with(ReceivedMessages::new);
Ok(received.insert(content).map_or(Step::default(), |fault| {
Fault::new(sender_id.clone(), fault).into()
}))
} else {
self.handle_message_content(sender_id, content)
}
@ -386,15 +495,17 @@ impl<N: NodeIdT, S: SessionIdT> BinaryAgreement<N, S> {
self.estimated = Some(b);
let sbvb_step = self.sbv_broadcast.send_bval(b)?;
let mut step = self.handle_sbvb_step(sbvb_step)?;
let queued_msgs = self
let epoch_state = self
.incoming_queue
.remove(&self.epoch)
.into_iter()
.flatten();
for (sender_id, content) in queued_msgs {
step.extend(self.handle_message_content(&sender_id, content)?);
if self.decision.is_some() {
break;
for (sender_id, received) in epoch_state {
for m in received.messages() {
step.extend(self.handle_message_content(&sender_id, m)?);
if self.decision.is_some() {
return Ok(step);
}
}
}
Ok(step)

View File

@ -119,7 +119,7 @@ pub enum MessageContent {
impl MessageContent {
/// Creates an message with a given epoch number.
pub fn with_epoch(self, epoch: u32) -> Message {
pub fn with_epoch(self, epoch: u64) -> Message {
Message {
epoch,
content: self,
@ -138,7 +138,7 @@ impl MessageContent {
/// Messages sent during the Binary Agreement stage.
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Rand)]
pub struct Message {
pub epoch: u32,
pub epoch: u64,
pub content: MessageContent,
}

View File

@ -22,7 +22,7 @@ use {NetworkInfo, NodeIdT, Target};
pub type Step<N> = ::Step<Message, BoolSet, N>;
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, PartialOrd, Eq, Ord)]
pub enum Message {
BVal(bool),
Aux(bool),

View File

@ -69,6 +69,12 @@ pub enum FaultKind {
DuplicateBVal,
/// `BinaryAgreement` received a duplicate `Aux` message.
DuplicateAux,
/// `BinaryAgreement` received multiple `Conf` messages.
MultipleConf,
/// `BinaryAgreement` received multiple `Term` messages.
MultipleTerm,
/// `BinaryAgreement` received a message with an epoch too far ahead.
AgreementEpoch,
}
/// A structure representing the context of a faulty node. This structure

View File

@ -232,7 +232,7 @@ struct AbaCommonCoinAdversary {
stage: usize,
stage_progress: usize,
sent_stage_messages: bool,
epoch: u32,
epoch: u64,
coin_state: CoinState<NodeId>,
/// The estimated value for nodes in A.
a_estimated: bool,
@ -250,7 +250,7 @@ impl AbaCommonCoinAdversary {
fn new_with_epoch(
netinfo_mutex: Arc<Mutex<Option<Arc<NetworkInfo<NodeId>>>>>,
epoch: u32,
epoch: u64,
a_estimated: bool,
) -> Self {
AbaCommonCoinAdversary {