refactored an error! by adding linearizable epochs and wrote a comment on eras

This commit is contained in:
Vladimir Komendantskiy 2018-11-01 21:37:41 +00:00
parent a8586efc81
commit 3deb5f1bce
7 changed files with 120 additions and 62 deletions

View File

@ -2,11 +2,18 @@
//!
//! Like Honey Badger, this protocol allows a network of _N_ nodes with at most _f_ faulty ones,
//! where _3 f < N_, to input "contributions" - any kind of data -, and to agree on a sequence of
//! _batches_ of contributions. The protocol proceeds in _epochs_, starting at number 0, and outputs
//! one batch in each epoch. It never terminates: It handles a continuous stream of incoming
//! _batches_ of contributions. The protocol proceeds in linear _epochs_, starting at number 0, and
//! outputs one batch in each epoch. It never terminates: It handles a continuous stream of incoming
//! contributions and keeps producing new batches from them. All correct nodes will output the same
//! batch for each epoch. Each validator proposes one contribution per epoch, and every batch will
//! contain the contributions of at least _N - f_ validators.
//! Epochs are divided into intervals called _eras_ starting at 0. Each following era begins
//! immediately after a batch that
//!
//! - proposes a change in the set of validators or
//!
//! - finalizes that proposed change.
//!
//! Unlike Honey Badger, this algorithm allows dynamically adding and removing validators.
//! As a signal to initiate converting observers to validators or vice versa, it defines a special
@ -17,10 +24,10 @@
//! create new cryptographic key shares for the new group of validators.
//!
//! The state of that process after each epoch is communicated via the `change` field in `Batch`.
//! When this contains an `InProgress(..)` value, key generation begins. The joining validator (in
//! the case of an `Add` change) must be an observer starting in the following epoch or earlier.
//! When `change` is `Complete(..)`, the following epochs will be produced by the new set of
//! validators.
//! When this contains an `InProgress(..)` value, key generation begins and the following epoch
//! starts the next era. The joining validator (in the case of an `Add` change) must be an observer
//! starting in the following epoch or earlier. When `change` is `Complete(..)`, the following
//! epoch starts the next era with the new set of validators.
//!
//! New observers can only join the network after an epoch where `change` was not `None`. These
//! epochs' batches contain a `JoinPlan`, which can be sent as an invitation to the new node: The
@ -132,6 +139,13 @@ impl<N: Rand> Message<N> {
#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, Serialize, Deserialize)]
pub struct Epoch(pub(super) u64, pub(super) Option<u64>);
/// The injection of linearizable epochs into `DynamicHoneyBadger` epochs.
impl From<(u64, u64)> for Epoch {
fn from((era, hb_epoch): (u64, u64)) -> Epoch {
Epoch(era, Some(hb_epoch))
}
}
impl PartialOrd for Epoch {
/// Partial ordering on epochs. For any `era` and `hb_epoch`, two epochs `Epoch(era, None)` and `Epoch(era,
/// Some(hb_epoch))` are incomparable.
@ -159,6 +173,7 @@ impl Default for Epoch {
impl<N: Rand> Epoched for Message<N> {
type Epoch = Epoch;
type LinEpoch = (u64, u64);
fn epoch(&self) -> Epoch {
match *self {
@ -167,6 +182,11 @@ impl<N: Rand> Epoched for Message<N> {
Message::SignedVote(ref signed_vote) => Epoch(signed_vote.era(), None),
}
}
fn linearizable_epoch(&self) -> Option<(u64, u64)> {
let Epoch(era, hb_epoch) = self.epoch();
hb_epoch.map(|hb_epoch| (era, hb_epoch))
}
}
/// The information a new node requires to join the network as an observer. It contains the state

View File

@ -1,4 +1,3 @@
use log::error;
use rand::Rand;
use serde::{de::DeserializeOwned, Serialize};
@ -24,13 +23,13 @@ where
}
}
fn next_epoch(&self) -> Epoch {
fn next_epoch(&self) -> (u64, u64) {
let epoch = self.epoch;
let era = self.era;
if self.change == ChangeState::None {
Epoch(era, Some(epoch - era + 1))
(era, epoch - era + 1)
} else {
Epoch(epoch + 1, Some(0))
(epoch + 1, 0)
}
}
}
@ -39,25 +38,28 @@ impl<N> SenderQueueableMessage for Message<N>
where
N: Rand,
{
fn is_accepted(&self, Epoch(them_era, them_hb_epoch): Epoch, max_future_epochs: u64) -> bool {
let Epoch(era, hb_epoch) = self.epoch();
fn is_accepted(&self, (them_era, them): (u64, u64), max_future_epochs: u64) -> bool {
let Epoch(era, us) = self.epoch();
if era != them_era {
return false;
}
match (hb_epoch, them_hb_epoch) {
(Some(us), Some(them)) => them <= us && us <= them + max_future_epochs,
(None, Some(_)) => true,
(_, None) => {
// TODO: return a Fault.
error!("Peer's Honey Badger epoch undefined");
false
}
if let Some(us) = us {
them <= us && us <= them + max_future_epochs
} else {
true
}
}
fn is_obsolete(&self, Epoch(them_era, them_hb_epoch): Epoch) -> bool {
let Epoch(era, hb_epoch) = self.epoch();
era < them_era || (era == them_era && hb_epoch.is_some() && hb_epoch < them_hb_epoch)
fn is_obsolete(&self, (them_era, them): (u64, u64)) -> bool {
let Epoch(era, us) = self.epoch();
if era < them_era {
return true;
}
if let Some(us) = us {
era == them_era && us < them
} else {
false
}
}
}

View File

@ -43,10 +43,15 @@ pub struct HoneyBadger<C, N: Rand> {
impl<C, N: Rand> Epoched for HoneyBadger<C, N> {
type Epoch = u64;
type LinEpoch = u64;
fn epoch(&self) -> Self::Epoch {
self.epoch
}
fn linearizable_epoch(&self) -> Option<Self::LinEpoch> {
Some(self.epoch)
}
}
pub type Step<C, N> = ::Step<HoneyBadger<C, N>>;

View File

@ -37,8 +37,13 @@ pub struct Message<N: Rand> {
impl<N: Rand> Epoched for Message<N> {
type Epoch = u64;
type LinEpoch = u64;
fn epoch(&self) -> u64 {
self.epoch
}
fn linearizable_epoch(&self) -> Option<u64> {
Some(self.epoch)
}
}

View File

@ -5,7 +5,7 @@ use Epoched;
#[derive(Clone, Debug, Deserialize, Serialize)]
pub enum Message<M: Epoched> {
EpochStarted(<M as Epoched>::Epoch),
EpochStarted(<M as Epoched>::LinEpoch),
Algo(M),
}
@ -13,6 +13,7 @@ impl<M> Rand for Message<M>
where
M: Epoched + Rand,
<M as Epoched>::Epoch: Rand,
<M as Epoched>::LinEpoch: Rand,
{
fn rand<R: Rng>(rng: &mut R) -> Self {
let message_type = *rng.choose(&["epoch", "algo"]).unwrap();
@ -28,15 +29,24 @@ where
impl<M> Epoched for Message<M>
where
M: Epoched,
<M as Epoched>::Epoch: From<<M as Epoched>::LinEpoch>,
{
type Epoch = <M as Epoched>::Epoch;
type LinEpoch = <M as Epoched>::LinEpoch;
fn epoch(&self) -> Self::Epoch {
match self {
Message::EpochStarted(epoch) => *epoch,
Message::EpochStarted(epoch) => <M as Epoched>::Epoch::from(*epoch),
Message::Algo(message) => message.epoch(),
}
}
fn linearizable_epoch(&self) -> Option<Self::LinEpoch> {
match self {
Message::EpochStarted(epoch) => Some(*epoch),
Message::Algo(message) => message.linearizable_epoch(),
}
}
}
impl<M: Epoched> From<M> for Message<M> {

View File

@ -19,10 +19,10 @@ pub use self::message::Message;
pub trait SenderQueueableMessage: Epoched {
/// Whether the message is accepted in epoch `them`.
fn is_accepted(&self, them: <Self as Epoched>::Epoch, max_future_epochs: u64) -> bool;
fn is_accepted(&self, them: <Self as Epoched>::LinEpoch, max_future_epochs: u64) -> bool;
/// Whether the epoch of the message is behind `them`.
fn is_obsolete(&self, them: <Self as Epoched>::Epoch) -> bool;
fn is_obsolete(&self, them: <Self as Epoched>::LinEpoch) -> bool;
}
pub trait SenderQueueableOutput<N, M>
@ -35,7 +35,7 @@ where
fn added_node(&self) -> Option<N>;
/// Computes the next epoch after the `DynamicHoneyBadger` epoch of the batch.
fn next_epoch(&self) -> <M as Epoched>::Epoch;
fn next_epoch(&self) -> <M as Epoched>::LinEpoch;
}
pub trait SenderQueueableEpoch
@ -90,13 +90,13 @@ where
algo: D,
/// Our node ID.
our_id: D::NodeId,
/// Current epoch.
epoch: <D::Message as Epoched>::Epoch,
/// Current linearizable epoch of the managed `DistAlgorithm`.
lin_epoch: <D::Message as Epoched>::LinEpoch,
/// Messages that couldn't be handled yet by remote nodes.
outgoing_queue: OutgoingQueue<D>,
/// The set of all remote nodes on the network including validator as well as non-validator
/// (observer) nodes together with their epochs as of the last communication.
peer_epochs: BTreeMap<D::NodeId, <D::Message as Epoched>::Epoch>,
peer_epochs: BTreeMap<D::NodeId, <D::Message as Epoched>::LinEpoch>,
}
pub type Step<D> = ::Step<SenderQueue<D>>;
@ -109,7 +109,7 @@ where
D::Message: Clone + SenderQueueableMessage + Serialize + DeserializeOwned,
D::NodeId: NodeIdT + Rand,
D::Output: SenderQueueableOutput<D::NodeId, D::Message>,
<D::Message as Epoched>::Epoch: SenderQueueableEpoch,
<D::Message as Epoched>::Epoch: SenderQueueableEpoch + From<<D::Message as Epoched>::LinEpoch>,
{
type NodeId = D::NodeId;
type Input = D::Input;
@ -144,7 +144,7 @@ where
D::Message: Clone + SenderQueueableMessage + Serialize + DeserializeOwned,
D::NodeId: NodeIdT + Rand,
D::Output: SenderQueueableOutput<D::NodeId, D::Message>,
<D::Message as Epoched>::Epoch: SenderQueueableEpoch,
<D::Message as Epoched>::Epoch: SenderQueueableEpoch + From<<D::Message as Epoched>::LinEpoch>,
{
/// Returns a new `SenderQueueBuilder` configured to manage a given `DynamicHoneyBadger` instance.
pub fn builder<I>(algo: D, peer_ids: I) -> SenderQueueBuilder<D>
@ -156,7 +156,7 @@ where
pub fn handle_input(&mut self, input: D::Input) -> Result<Step<D>, D> {
let mut step = self.algo.handle_input(input)?;
let mut sender_queue_step = self.update_epoch(&step);
let mut sender_queue_step = self.update_lin_epoch(&step);
self.defer_messages(&mut step);
sender_queue_step.extend(step.map(|output| output, Message::from));
Ok(sender_queue_step)
@ -168,7 +168,7 @@ where
message: Message<D::Message>,
) -> Result<Step<D>, D> {
match message {
Message::EpochStarted(epoch) => Ok(self.handle_epoch_started(sender_id, epoch)),
Message::EpochStarted(lin_epoch) => Ok(self.handle_epoch_started(sender_id, lin_epoch)),
Message::Algo(msg) => self.handle_message_content(sender_id, msg),
}
}
@ -177,17 +177,17 @@ where
fn handle_epoch_started(
&mut self,
sender_id: &D::NodeId,
epoch: <D::Message as Epoched>::Epoch,
lin_epoch: <D::Message as Epoched>::LinEpoch,
) -> Step<D> {
self.peer_epochs
.entry(sender_id.clone())
.and_modify(|e| {
if *e < epoch {
*e = epoch;
if *e < lin_epoch {
*e = lin_epoch;
}
}).or_insert(epoch);
self.remove_earlier_messages(sender_id, epoch);
self.process_new_epoch(sender_id, epoch)
}).or_insert(lin_epoch);
self.remove_earlier_messages(sender_id, <D::Message as Epoched>::Epoch::from(lin_epoch));
self.process_new_epoch(sender_id, <D::Message as Epoched>::Epoch::from(lin_epoch))
}
/// Removes all messages queued for the remote node from epochs upto `epoch`.
@ -240,31 +240,31 @@ where
content: D::Message,
) -> Result<Step<D>, D> {
let mut step = self.algo.handle_message(sender_id, content)?;
let mut sender_queue_step = self.update_epoch(&step);
let mut sender_queue_step = self.update_lin_epoch(&step);
self.defer_messages(&mut step);
sender_queue_step.extend(step.map(|output| output, Message::from));
Ok(sender_queue_step)
}
/// Updates the current Honey Badger epoch.
fn update_epoch(&mut self, step: &::Step<D>) -> Step<D> {
fn update_lin_epoch(&mut self, step: &::Step<D>) -> Step<D> {
// Look up `DynamicHoneyBadger` epoch updates and collect any added peers.
let new_epoch = step.output.iter().fold(self.epoch, |epoch, batch| {
let max_epoch = epoch.max(batch.next_epoch());
let new_epoch = step.output.iter().fold(self.lin_epoch, |lin_epoch, batch| {
let max_epoch = lin_epoch.max(batch.next_epoch());
if let Some(node) = batch.added_node() {
if &node != self.our_id() {
self.peer_epochs
.entry(node)
.or_insert_with(<D::Message as Epoched>::Epoch::default);
.or_insert_with(<D::Message as Epoched>::LinEpoch::default);
}
}
max_epoch
});
if new_epoch != self.epoch {
self.epoch = new_epoch;
if new_epoch != self.lin_epoch {
self.lin_epoch = new_epoch;
// Announce the new epoch.
Target::All
.message(Message::EpochStarted(self.epoch))
.message(Message::EpochStarted(self.lin_epoch))
.into()
} else {
Step::default()
@ -301,9 +301,9 @@ where
D::Message: Epoched,
{
algo: D,
epoch: <D::Message as Epoched>::Epoch,
lin_epoch: <D::Message as Epoched>::LinEpoch,
outgoing_queue: OutgoingQueue<D>,
peer_epochs: BTreeMap<D::NodeId, <D::Message as Epoched>::Epoch>,
peer_epochs: BTreeMap<D::NodeId, <D::Message as Epoched>::LinEpoch>,
}
impl<D> SenderQueueBuilder<D>
@ -312,7 +312,7 @@ where
D::Message: Clone + SenderQueueableMessage + Serialize + DeserializeOwned,
D::NodeId: NodeIdT + Rand,
D::Output: SenderQueueableOutput<D::NodeId, D::Message>,
<D::Message as Epoched>::Epoch: SenderQueueableEpoch,
<D::Message as Epoched>::Epoch: SenderQueueableEpoch + From<<D::Message as Epoched>::LinEpoch>,
{
pub fn new<I>(algo: D, peer_ids: I) -> Self
where
@ -320,16 +320,16 @@ where
{
SenderQueueBuilder {
algo,
epoch: <D::Message as Epoched>::Epoch::default(),
lin_epoch: <D::Message as Epoched>::LinEpoch::default(),
outgoing_queue: BTreeMap::default(),
peer_epochs: peer_ids
.map(|id| (id, <D::Message as Epoched>::Epoch::default()))
.map(|id| (id, <D::Message as Epoched>::LinEpoch::default()))
.collect(),
}
}
pub fn epoch(mut self, epoch: <D::Message as Epoched>::Epoch) -> Self {
self.epoch = epoch;
pub fn lin_epoch(mut self, lin_epoch: <D::Message as Epoched>::LinEpoch) -> Self {
self.lin_epoch = lin_epoch;
self
}
@ -340,22 +340,22 @@ where
pub fn peer_epochs(
mut self,
peer_epochs: BTreeMap<D::NodeId, <D::Message as Epoched>::Epoch>,
peer_epochs: BTreeMap<D::NodeId, <D::Message as Epoched>::LinEpoch>,
) -> Self {
self.peer_epochs = peer_epochs;
self
}
pub fn build(self, our_id: D::NodeId) -> (SenderQueue<D>, Step<D>) {
let epoch = <D::Message as Epoched>::Epoch::default();
let lin_epoch = <D::Message as Epoched>::LinEpoch::default();
let sq = SenderQueue {
algo: self.algo,
our_id,
epoch: self.epoch,
lin_epoch: self.lin_epoch,
outgoing_queue: self.outgoing_queue,
peer_epochs: self.peer_epochs,
};
let step: Step<D> = Target::All.message(Message::EpochStarted(epoch)).into();
let step: Step<D> = Target::All.message(Message::EpochStarted(lin_epoch)).into();
(sq, step)
}
}

View File

@ -29,6 +29,10 @@ impl<M> Message for M where M: Debug + Send + Sync {}
pub trait SessionIdT: Display + Serialize + Send + Sync + Clone {}
impl<S> SessionIdT for S where S: Display + Serialize + Send + Sync + Clone {}
/// Epochs.
pub trait EpochT: Copy + Message + Default + Eq + Ord + Serialize + DeserializeOwned {}
impl<E> EpochT for E where E: Copy + Message + Default + Eq + Ord + Serialize + DeserializeOwned {}
/// Single algorithm step outcome.
///
/// Each time input (typically in the form of user input or incoming network messages) is provided
@ -203,18 +207,30 @@ where
/// notion of _epoch_. This interface summarizes the properties that are essential for the message
/// sender queue.
pub trait Epoched {
type Epoch: Copy + Message + Default + Eq + Ord + Serialize + DeserializeOwned;
/// Type of epoch. It is not required to be totally ordered.
type Epoch: EpochT;
/// A subtype of `Epoch` which contains sets of "linearizable epochs" such that each of those
/// sets is totally ordered and each has a least element.
type LinEpoch: EpochT;
/// Returns the object's epoch number.
fn epoch(&self) -> Self::Epoch;
/// Returns the object's linearizable epoch number if the object's epoch can be linearized.
fn linearizable_epoch(&self) -> Option<Self::LinEpoch>;
}
impl<M: Epoched, N> Epoched for TargetedMessage<M, N> {
type Epoch = <M as Epoched>::Epoch;
type LinEpoch = <M as Epoched>::LinEpoch;
fn epoch(&self) -> Self::Epoch {
self.message.epoch()
}
fn linearizable_epoch(&self) -> Option<Self::LinEpoch> {
self.message.linearizable_epoch()
}
}
impl<'i, D> Step<D>
@ -229,7 +245,7 @@ where
/// remaining messages can be sent to remote nodes without delay.
pub fn defer_messages(
&mut self,
peer_epochs: &'i BTreeMap<D::NodeId, <D::Message as Epoched>::Epoch>,
peer_epochs: &'i BTreeMap<D::NodeId, <D::Message as Epoched>::LinEpoch>,
max_future_epochs: u64,
) -> Vec<(D::NodeId, D::Message)>
where