Simplify the sender queue.

Remove the distinction between linearized and regular epochs.
Avoid iterating through the whole outgoing queue on epoch change.
This commit is contained in:
Andreas Fackler 2018-11-07 17:21:24 +01:00 committed by Andreas Fackler
parent 5ea0b92484
commit d0b96f2dc8
12 changed files with 169 additions and 321 deletions

View File

@ -21,7 +21,7 @@ use honey_badger::{self, HoneyBadger, Message as HbMessage};
use sync_key_gen::{Ack, AckOutcome, Part, PartOutcome, SyncKeyGen};
use threshold_decrypt::EncryptionSchedule;
use util::{self, SubRng};
use {Contribution, DistAlgorithm, NetworkInfo, NodeIdT, Target};
use {Contribution, DistAlgorithm, Epoched, NetworkInfo, NodeIdT, Target};
/// A Honey Badger instance that can handle adding and removing nodes.
#[derive(Derivative)]
@ -493,3 +493,15 @@ where
write!(f, "{:?} DHB(era: {})", self.our_id(), self.era)
}
}
impl<C, N> Epoched for DynamicHoneyBadger<C, N>
where
C: Contribution + Serialize + DeserializeOwned,
N: NodeIdT + Serialize + DeserializeOwned + Rand,
{
type Epoch = (u64, u64);
fn epoch(&self) -> (u64, u64) {
(self.era, self.honey_badger.epoch())
}
}

View File

@ -72,7 +72,6 @@ mod dynamic_honey_badger;
mod error;
mod votes;
use std::cmp::Ordering;
use std::collections::BTreeMap;
use crypto::{PublicKey, PublicKeySet, Signature};
@ -83,7 +82,7 @@ use self::votes::{SignedVote, VoteCounter};
use super::threshold_decrypt::EncryptionSchedule;
use honey_badger::Message as HbMessage;
use sync_key_gen::{Ack, Part, SyncKeyGen};
use {Epoched, NodeIdT};
use NodeIdT;
pub use self::batch::Batch;
pub use self::builder::DynamicHoneyBadgerBuilder;
@ -134,62 +133,6 @@ impl<N: Rand> Message<N> {
}
}
/// Dynamic Honey Badger epoch. It consists of an era and an epoch of Honey Badger that started in
/// that era. For messages originating from `DynamicHoneyBadger` as opposed to `HoneyBadger`, that
/// HoneyBadger epoch is `None`.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, Serialize, Deserialize)]
pub struct Epoch(pub(super) u64, pub(super) Option<u64>);
/// The injection of linearizable epochs into `DynamicHoneyBadger` epochs.
impl From<(u64, u64)> for Epoch {
fn from((era, hb_epoch): (u64, u64)) -> Epoch {
Epoch(era, Some(hb_epoch))
}
}
impl PartialOrd for Epoch {
/// Partial ordering on epochs. For any `era` and `hb_epoch`, two epochs `Epoch(era, None)` and `Epoch(era,
/// Some(hb_epoch))` are incomparable.
fn partial_cmp(&self, other: &Epoch) -> Option<Ordering> {
let (&Epoch(a, b), &Epoch(c, d)) = (self, other);
if a < c {
Some(Ordering::Less)
} else if a > c {
Some(Ordering::Greater)
} else if b.is_none() && d.is_none() {
Some(Ordering::Equal)
} else if let (Some(b), Some(d)) = (b, d) {
Some(Ord::cmp(&b, &d))
} else {
None
}
}
}
impl Default for Epoch {
fn default() -> Epoch {
Epoch(0, Some(0))
}
}
impl<N: Rand> Epoched for Message<N> {
type Epoch = Epoch;
type LinEpoch = (u64, u64);
fn epoch(&self) -> Epoch {
match *self {
Message::HoneyBadger(era, ref msg) => Epoch(era, Some(msg.epoch())),
Message::KeyGen(era, _, _) => Epoch(era, None),
Message::SignedVote(ref signed_vote) => Epoch(signed_vote.era(), None),
}
}
fn linearizable_epoch(&self) -> Option<(u64, u64)> {
let Epoch(era, hb_epoch) = self.epoch();
hb_epoch.map(|hb_epoch| (era, hb_epoch))
}
}
/// The information a new node requires to join the network as an observer. It contains the state
/// of voting and key generation after a specific epoch, so that the new node will be in sync if it
/// joins in the next one.

View File

@ -8,7 +8,7 @@ use serde::{de::DeserializeOwned, Serialize};
use super::epoch_state::EpochState;
use super::{Batch, Error, ErrorKind, HoneyBadgerBuilder, Message, Result};
use {util, Contribution, DistAlgorithm, Epoched, Fault, FaultKind, NetworkInfo, NodeIdT};
use {util, Contribution, DistAlgorithm, Fault, FaultKind, NetworkInfo, NodeIdT};
pub use super::epoch_state::SubsetHandlingStrategy;
use threshold_decrypt::EncryptionSchedule;
@ -40,19 +40,6 @@ pub struct HoneyBadger<C, N: Rand> {
pub(super) encryption_schedule: EncryptionSchedule,
}
impl<C, N: Rand> Epoched for HoneyBadger<C, N> {
type Epoch = u64;
type LinEpoch = u64;
fn epoch(&self) -> Self::Epoch {
self.epoch
}
fn linearizable_epoch(&self) -> Option<Self::LinEpoch> {
Some(self.epoch)
}
}
pub type Step<C, N> = ::DaStep<HoneyBadger<C, N>>;
impl<C, N> DistAlgorithm for HoneyBadger<C, N>
@ -127,16 +114,16 @@ where
return Err(ErrorKind::UnknownSender.into());
}
let Message { epoch, content } = message;
if self.epoch <= epoch && epoch <= self.epoch + self.max_future_epochs {
if epoch > self.epoch + self.max_future_epochs {
Ok(Fault::new(sender_id.clone(), FaultKind::UnexpectedHbMessageEpoch).into())
} else if epoch < self.epoch {
// The message is late; discard it.
Ok(Step::default())
} else {
let step = self
.epoch_state_mut(epoch)?
.handle_message_content(sender_id, content)?;
Ok(step.join(self.try_output_batches()?))
} else if epoch > self.epoch + self.max_future_epochs {
Ok(Fault::new(sender_id.clone(), FaultKind::UnexpectedHbMessageEpoch).into())
} else {
// The message is late; discard it.
Ok(Step::default())
}
}
@ -145,10 +132,17 @@ where
!self.netinfo.is_validator() || self.has_input
}
/// Returns the current encryption schedule that determines in which epochs contributions are
/// encrypted.
pub fn get_encryption_schedule(&self) -> EncryptionSchedule {
self.encryption_schedule
}
/// Returns the current epoch.
pub fn epoch(&self) -> u64 {
self.epoch
}
/// Returns the number of validators from which we have already received a proposal for the
/// current epoch.
pub(crate) fn received_proposals(&self) -> usize {

View File

@ -5,8 +5,6 @@ use serde_derive::{Deserialize, Serialize};
use subset;
use threshold_decrypt;
use Epoched;
/// The content of a `HoneyBadger` message. It should be further annotated with an epoch.
#[derive(Clone, Debug, Deserialize, Rand, Serialize)]
pub enum MessageContent<N: Rand> {
@ -35,15 +33,9 @@ pub struct Message<N: Rand> {
pub(super) content: MessageContent<N>,
}
impl<N: Rand> Epoched for Message<N> {
type Epoch = u64;
type LinEpoch = u64;
fn epoch(&self) -> u64 {
impl<N: Rand> Message<N> {
/// Returns this message's Honey Badger epoch.
pub fn epoch(&self) -> u64 {
self.epoch
}
fn linearizable_epoch(&self) -> Option<u64> {
Some(self.epoch)
}
}

View File

@ -36,7 +36,7 @@ use dynamic_honey_badger::{self, Batch as DhbBatch, DynamicHoneyBadger, Message}
use transaction_queue::TransactionQueue;
use {util, Contribution, DistAlgorithm, NodeIdT};
pub use dynamic_honey_badger::{Change, ChangeState, Epoch, Input, NodeChange};
pub use dynamic_honey_badger::{Change, ChangeState, Input, NodeChange};
/// Queueing honey badger error variants.
#[derive(Debug, Fail)]

View File

@ -7,13 +7,12 @@ use rand::Rand;
use serde::{de::DeserializeOwned, Serialize};
use super::{
SenderQueue, SenderQueueableDistAlgorithm, SenderQueueableEpoch, SenderQueueableMessage,
SenderQueueableOutput,
SenderQueue, SenderQueueableDistAlgorithm, SenderQueueableMessage, SenderQueueableOutput,
};
use {Contribution, DaStep, Epoched, NodeIdT};
use dynamic_honey_badger::{
Batch, Change, ChangeState, DynamicHoneyBadger, Epoch, Error as DhbError, Message, NodeChange,
Batch, Change, ChangeState, DynamicHoneyBadger, Error as DhbError, Message, NodeChange,
};
impl<C, N> SenderQueueableOutput<N, Message<N>> for Batch<C, N>
@ -31,14 +30,16 @@ where
None
}
}
}
fn next_epoch(&self) -> (u64, u64) {
let epoch = self.epoch();
let era = self.era();
if *self.change() == ChangeState::None {
(era, epoch - era + 1)
} else {
(epoch + 1, 0)
impl<N: Rand> Epoched for Message<N> {
type Epoch = (u64, u64);
fn epoch(&self) -> (u64, u64) {
match *self {
Message::HoneyBadger(era, ref msg) => (era, msg.epoch()),
Message::KeyGen(era, _, _) => (era, 0),
Message::SignedVote(ref signed_vote) => (signed_vote.era(), 0),
}
}
}
@ -47,37 +48,23 @@ impl<N> SenderQueueableMessage for Message<N>
where
N: Rand,
{
fn is_accepted(&self, (them_era, them): (u64, u64), max_future_epochs: u64) -> bool {
let Epoch(era, us) = self.epoch();
if era != them_era {
return false;
}
if let Some(us) = us {
them <= us && us <= them + max_future_epochs
} else {
true
fn is_premature(&self, (them_era, them): (u64, u64), max_future_epochs: u64) -> bool {
match *self {
Message::HoneyBadger(era, ref msg) => {
era > them_era || (era == them_era && msg.epoch() > them + max_future_epochs)
}
Message::KeyGen(era, _, _) => era > them_era,
Message::SignedVote(ref signed_vote) => signed_vote.era() > them_era,
}
}
fn is_obsolete(&self, (them_era, them): (u64, u64)) -> bool {
let Epoch(era, us) = self.epoch();
if era < them_era {
return true;
}
if let Some(us) = us {
era == them_era && us < them
} else {
false
}
}
}
impl SenderQueueableEpoch for Epoch {
fn spanning_epochs(&self) -> Vec<Self> {
if let Epoch(era, Some(_)) = *self {
vec![Epoch(era, None)]
} else {
vec![]
match *self {
Message::HoneyBadger(era, ref msg) => {
era < them_era || (era == them_era && msg.epoch() < them)
}
Message::KeyGen(era, _, _) => era < them_era,
Message::SignedVote(ref signed_vote) => signed_vote.era() < them_era,
}
}
}

View File

@ -1,10 +1,7 @@
use rand::Rand;
use serde::{de::DeserializeOwned, Serialize};
use super::{
SenderQueueableDistAlgorithm, SenderQueueableEpoch, SenderQueueableMessage,
SenderQueueableOutput,
};
use super::{SenderQueueableDistAlgorithm, SenderQueueableMessage, SenderQueueableOutput};
use honey_badger::{Batch, HoneyBadger, Message};
use {Contribution, Epoched, NodeIdT};
@ -16,9 +13,13 @@ where
fn added_node(&self) -> Option<N> {
None
}
}
fn next_epoch(&self) -> u64 {
self.epoch + 1
impl<N: Rand> Epoched for Message<N> {
type Epoch = u64;
fn epoch(&self) -> u64 {
self.epoch()
}
}
@ -26,9 +27,8 @@ impl<N> SenderQueueableMessage for Message<N>
where
N: Rand,
{
fn is_accepted(&self, them: u64, max_future_epochs: u64) -> bool {
let our_epoch = self.epoch();
them <= our_epoch && our_epoch <= them + max_future_epochs
fn is_premature(&self, them: u64, max_future_epochs: u64) -> bool {
self.epoch() > them + max_future_epochs
}
fn is_obsolete(&self, them: u64) -> bool {
@ -36,9 +36,15 @@ where
}
}
impl SenderQueueableEpoch for u64 {
fn spanning_epochs(&self) -> Vec<Self> {
vec![]
impl<C, N> Epoched for HoneyBadger<C, N>
where
C: Contribution + Serialize + DeserializeOwned,
N: NodeIdT + Rand,
{
type Epoch = u64;
fn epoch(&self) -> u64 {
self.epoch()
}
}

View File

@ -5,7 +5,7 @@ use Epoched;
#[derive(Clone, Debug, Deserialize, Serialize)]
pub enum Message<M: Epoched> {
EpochStarted(<M as Epoched>::LinEpoch),
EpochStarted(<M as Epoched>::Epoch),
Algo(M),
}
@ -13,7 +13,6 @@ impl<M> Rand for Message<M>
where
M: Epoched + Rand,
<M as Epoched>::Epoch: Rand,
<M as Epoched>::LinEpoch: Rand,
{
fn rand<R: Rng>(rng: &mut R) -> Self {
let message_type = *rng.choose(&["epoch", "algo"]).unwrap();
@ -26,29 +25,6 @@ where
}
}
impl<M> Epoched for Message<M>
where
M: Epoched,
<M as Epoched>::Epoch: From<<M as Epoched>::LinEpoch>,
{
type Epoch = <M as Epoched>::Epoch;
type LinEpoch = <M as Epoched>::LinEpoch;
fn epoch(&self) -> Self::Epoch {
match self {
Message::EpochStarted(epoch) => <M as Epoched>::Epoch::from(*epoch),
Message::Algo(message) => message.epoch(),
}
}
fn linearizable_epoch(&self) -> Option<Self::LinEpoch> {
match self {
Message::EpochStarted(epoch) => Some(*epoch),
Message::Algo(message) => message.linearizable_epoch(),
}
}
}
impl<M: Epoched> From<M> for Message<M> {
fn from(message: M) -> Self {
Message::Algo(message)

View File

@ -14,19 +14,21 @@ pub mod queueing_honey_badger;
use std::collections::BTreeMap;
use std::fmt::Debug;
use rand::Rand;
use serde::{de::DeserializeOwned, Serialize};
use {DaStep, DistAlgorithm, Epoched, NodeIdT, Target};
pub use self::message::Message;
pub trait SenderQueueableMessage: Epoched {
/// Whether the message is accepted in epoch `them`.
fn is_accepted(&self, them: <Self as Epoched>::LinEpoch, max_future_epochs: u64) -> bool;
/// Whether the message needs to be deferred.
fn is_premature(&self, them: <Self as Epoched>::Epoch, max_future_epochs: u64) -> bool;
/// Whether the epoch of the message is behind `them`.
fn is_obsolete(&self, them: <Self as Epoched>::LinEpoch) -> bool;
fn is_obsolete(&self, them: <Self as Epoched>::Epoch) -> bool;
/// Whether the message is neither obsolete nor premature.
fn is_accepted(&self, them: <Self as Epoched>::Epoch, max_future_epochs: u64) -> bool {
!self.is_premature(them, max_future_epochs) && !self.is_obsolete(them)
}
}
pub trait SenderQueueableOutput<N, M>
@ -37,29 +39,9 @@ where
/// Returns an optional new node added with the batch. This node should be added to the set of
/// all nodes.
fn added_node(&self) -> Option<N>;
/// Computes the next epoch after the `DynamicHoneyBadger` epoch of the batch.
fn next_epoch(&self) -> <M as Epoched>::LinEpoch;
}
pub trait SenderQueueableEpoch
where
Self: Sized,
{
/// A _spanning epoch_ of an epoch `e` is an epoch `e0` such that
///
/// - `e` and `e0` are incomparable by the partial ordering on epochs and
///
/// - the duration of `e0` is at least that of `e`.
///
/// Returned is a list of spanning epochs for the given epoch.
///
/// For example, any `DynamicHoneyBadger` epoch `Epoch((x, Some(y)))` has a unique spanning
/// epoch `Epoch((x, None))`. In turn, no epoch `Epoch((x, None))` has a spanning epoch.
fn spanning_epochs(&self) -> Vec<Self>;
}
pub trait SenderQueueableDistAlgorithm
pub trait SenderQueueableDistAlgorithm: Epoched
where
Self: DistAlgorithm,
{
@ -69,11 +51,8 @@ where
}
pub type OutgoingQueue<D> = BTreeMap<
(
<D as DistAlgorithm>::NodeId,
<<D as DistAlgorithm>::Message as Epoched>::Epoch,
),
Vec<<D as DistAlgorithm>::Message>,
<D as DistAlgorithm>::NodeId,
BTreeMap<<<D as DistAlgorithm>::Message as Epoched>::Epoch, Vec<<D as DistAlgorithm>::Message>>,
>;
/// An instance of `DistAlgorithm` wrapped with a queue of outgoing messages, that is, a sender
@ -86,32 +65,27 @@ pub type OutgoingQueue<D> = BTreeMap<
pub struct SenderQueue<D>
where
D: SenderQueueableDistAlgorithm,
D::Message: Clone + SenderQueueableMessage + Serialize + DeserializeOwned,
D::NodeId: NodeIdT + Rand,
D::Output: SenderQueueableOutput<D::NodeId, D::Message>,
D::Message: Epoched,
{
/// The managed `DistAlgorithm` instance.
algo: D,
/// Our node ID.
our_id: D::NodeId,
/// Current linearizable epoch of the managed `DistAlgorithm`.
lin_epoch: <D::Message as Epoched>::LinEpoch,
/// Messages that couldn't be handled yet by remote nodes.
outgoing_queue: OutgoingQueue<D>,
/// The set of all remote nodes on the network including validator as well as non-validator
/// (observer) nodes together with their epochs as of the last communication.
peer_epochs: BTreeMap<D::NodeId, <D::Message as Epoched>::LinEpoch>,
peer_epochs: BTreeMap<D::NodeId, <D::Message as Epoched>::Epoch>,
}
pub type Step<D> = ::DaStep<SenderQueue<D>>;
impl<D> DistAlgorithm for SenderQueue<D>
where
D: SenderQueueableDistAlgorithm + Debug + Send + Sync,
D::Message: Clone + SenderQueueableMessage + Serialize + DeserializeOwned,
D::NodeId: NodeIdT + Rand,
D: SenderQueueableDistAlgorithm + Debug,
D::Message: Clone + SenderQueueableMessage + Epoched<Epoch = <D as Epoched>::Epoch>,
D::NodeId: NodeIdT,
D::Output: SenderQueueableOutput<D::NodeId, D::Message>,
<D::Message as Epoched>::Epoch: SenderQueueableEpoch + From<<D::Message as Epoched>::LinEpoch>,
{
type NodeId = D::NodeId;
type Input = D::Input;
@ -142,11 +116,10 @@ where
impl<D> SenderQueue<D>
where
D: SenderQueueableDistAlgorithm + Debug + Send + Sync,
D::Message: Clone + SenderQueueableMessage + Serialize + DeserializeOwned,
D::NodeId: NodeIdT + Rand,
D: SenderQueueableDistAlgorithm + Debug,
D::Message: Clone + SenderQueueableMessage + Epoched<Epoch = <D as Epoched>::Epoch>,
D::NodeId: NodeIdT,
D::Output: SenderQueueableOutput<D::NodeId, D::Message>,
<D::Message as Epoched>::Epoch: SenderQueueableEpoch + From<<D::Message as Epoched>::LinEpoch>,
{
/// Returns a new `SenderQueueBuilder` configured to manage a given `DynamicHoneyBadger`
/// instance.
@ -167,7 +140,7 @@ where
message: Message<D::Message>,
) -> Result<DaStep<Self>, D::Error> {
match message {
Message::EpochStarted(lin_epoch) => Ok(self.handle_epoch_started(sender_id, lin_epoch)),
Message::EpochStarted(epoch) => Ok(self.handle_epoch_started(sender_id, epoch)),
Message::Algo(msg) => self.handle_message_content(sender_id, msg),
}
}
@ -179,7 +152,7 @@ where
F: FnOnce(&mut D) -> Result<DaStep<D>, D::Error>,
{
let mut step = f(&mut self.algo)?;
let mut sender_queue_step = self.update_lin_epoch(&step);
let mut sender_queue_step = self.update_epoch(&step);
self.defer_messages(&mut step);
sender_queue_step.extend(step.map(|output| output, Message::from));
Ok(sender_queue_step)
@ -189,34 +162,16 @@ where
fn handle_epoch_started(
&mut self,
sender_id: &D::NodeId,
lin_epoch: <D::Message as Epoched>::LinEpoch,
epoch: <D::Message as Epoched>::Epoch,
) -> DaStep<Self> {
self.peer_epochs
.entry(sender_id.clone())
.and_modify(|e| {
if *e < lin_epoch {
*e = lin_epoch;
if *e < epoch {
*e = epoch;
}
}).or_insert(lin_epoch);
self.remove_earlier_messages(sender_id, <D::Message as Epoched>::Epoch::from(lin_epoch));
self.process_new_epoch(sender_id, <D::Message as Epoched>::Epoch::from(lin_epoch))
}
/// Removes all messages queued for the remote node from epochs upto `epoch`.
fn remove_earlier_messages(
&mut self,
sender_id: &D::NodeId,
epoch: <D::Message as Epoched>::Epoch,
) {
let earlier_keys: Vec<_> = self
.outgoing_queue
.keys()
.cloned()
.filter(|(id, this_epoch)| id == sender_id && *this_epoch < epoch)
.collect();
for key in earlier_keys {
self.outgoing_queue.remove(&key);
}
}).or_insert(epoch);
self.process_new_epoch(sender_id, epoch)
}
/// Processes an announcement of a new epoch update received from a remote node.
@ -225,24 +180,22 @@ where
sender_id: &D::NodeId,
epoch: <D::Message as Epoched>::Epoch,
) -> DaStep<Self> {
// Send any HB messages for the HB epoch.
let mut ready_messages = self
.outgoing_queue
.remove(&(sender_id.clone(), epoch))
.unwrap_or_default();
for u in epoch.spanning_epochs() {
// Send any DHB messages for the DHB era.
ready_messages.extend(
self.outgoing_queue
.remove(&(sender_id.clone(), u))
.unwrap_or_default(),
);
}
Step::<D>::from(
ready_messages
.into_iter()
.map(|msg| Target::Node(sender_id.clone()).message(Message::Algo(msg))),
)
let queue = match self.outgoing_queue.get_mut(sender_id) {
None => return DaStep::<Self>::default(),
Some(queue) => queue,
};
let earlier_keys: Vec<_> = queue
.keys()
.cloned()
.take_while(|this_epoch| *this_epoch <= epoch)
.collect();
earlier_keys
.into_iter()
.filter_map(|key| queue.remove(&key))
.flatten()
.filter(|msg| !msg.is_obsolete(epoch))
.map(|msg| Target::Node(sender_id.clone()).message(Message::Algo(msg)))
.into()
}
/// Handles a Honey Badger algorithm message in a given epoch.
@ -255,28 +208,20 @@ where
}
/// Updates the current Honey Badger epoch.
fn update_lin_epoch(&mut self, step: &DaStep<D>) -> DaStep<Self> {
// Look up `DynamicHoneyBadger` epoch updates and collect any added peers.
let new_epoch = step.output.iter().fold(self.lin_epoch, |lin_epoch, batch| {
let max_epoch = lin_epoch.max(batch.next_epoch());
if let Some(node) = batch.added_node() {
if &node != self.our_id() {
self.peer_epochs
.entry(node)
.or_insert_with(<D::Message as Epoched>::LinEpoch::default);
}
}
max_epoch
});
if new_epoch != self.lin_epoch {
self.lin_epoch = new_epoch;
// Announce the new epoch.
Target::All
.message(Message::EpochStarted(self.lin_epoch))
.into()
} else {
Step::<D>::default()
fn update_epoch(&mut self, step: &DaStep<D>) -> DaStep<Self> {
if step.output.is_empty() {
return Step::<D>::default();
}
// Look up `DynamicHoneyBadger` epoch updates and collect any added peers.
for node in step.output.iter().filter_map(|batch| batch.added_node()) {
if &node != self.our_id() {
self.peer_epochs.entry(node).or_default();
}
}
// Announce the new epoch.
Target::All
.message(Message::EpochStarted(self.algo.epoch()))
.into()
}
/// Removes any messages to nodes at earlier epochs from the given `Step`. This may involve
@ -287,10 +232,11 @@ where
let max_future_epochs = self.algo.max_future_epochs();
// Append the deferred messages onto the queues.
for (id, message) in step.defer_messages(&self.peer_epochs, max_future_epochs) {
let epoch = message.epoch();
self.outgoing_queue
.entry((id, epoch))
.or_insert_with(Vec::new)
.entry(id)
.or_default()
.entry(message.epoch())
.or_default()
.push(message);
}
}
@ -309,18 +255,16 @@ where
D::Message: Epoched,
{
algo: D,
lin_epoch: <D::Message as Epoched>::LinEpoch,
outgoing_queue: OutgoingQueue<D>,
peer_epochs: BTreeMap<D::NodeId, <D::Message as Epoched>::LinEpoch>,
peer_epochs: BTreeMap<D::NodeId, <D::Message as Epoched>::Epoch>,
}
impl<D> SenderQueueBuilder<D>
where
D: SenderQueueableDistAlgorithm + Debug + Send + Sync,
D::Message: Clone + SenderQueueableMessage + Serialize + DeserializeOwned,
D::NodeId: NodeIdT + Rand,
D: SenderQueueableDistAlgorithm + Debug,
D::Message: Clone + SenderQueueableMessage + Epoched<Epoch = <D as Epoched>::Epoch>,
D::NodeId: NodeIdT,
D::Output: SenderQueueableOutput<D::NodeId, D::Message>,
<D::Message as Epoched>::Epoch: SenderQueueableEpoch + From<<D::Message as Epoched>::LinEpoch>,
{
pub fn new<I>(algo: D, peer_ids: I) -> Self
where
@ -328,19 +272,13 @@ where
{
SenderQueueBuilder {
algo,
lin_epoch: <D::Message as Epoched>::LinEpoch::default(),
outgoing_queue: BTreeMap::default(),
peer_epochs: peer_ids
.map(|id| (id, <D::Message as Epoched>::LinEpoch::default()))
.map(|id| (id, <D::Message as Epoched>::Epoch::default()))
.collect(),
}
}
pub fn lin_epoch(mut self, lin_epoch: <D::Message as Epoched>::LinEpoch) -> Self {
self.lin_epoch = lin_epoch;
self
}
pub fn outgoing_queue(mut self, outgoing_queue: OutgoingQueue<D>) -> Self {
self.outgoing_queue = outgoing_queue;
self
@ -348,22 +286,21 @@ where
pub fn peer_epochs(
mut self,
peer_epochs: BTreeMap<D::NodeId, <D::Message as Epoched>::LinEpoch>,
peer_epochs: BTreeMap<D::NodeId, <D::Message as Epoched>::Epoch>,
) -> Self {
self.peer_epochs = peer_epochs;
self
}
pub fn build(self, our_id: D::NodeId) -> (SenderQueue<D>, DaStep<SenderQueue<D>>) {
let lin_epoch = <D::Message as Epoched>::LinEpoch::default();
let epoch = self.algo.epoch();
let sq = SenderQueue {
algo: self.algo,
our_id,
lin_epoch: self.lin_epoch,
outgoing_queue: self.outgoing_queue,
peer_epochs: self.peer_epochs,
};
let step = Target::All.message(Message::EpochStarted(lin_epoch)).into();
let step = Target::All.message(Message::EpochStarted(epoch)).into();
(sq, step)
}
}

View File

@ -9,7 +9,20 @@ use serde::{de::DeserializeOwned, Serialize};
use super::{SenderQueue, SenderQueueableDistAlgorithm};
use queueing_honey_badger::{Change, Error as QhbError, QueueingHoneyBadger};
use transaction_queue::TransactionQueue;
use {Contribution, DaStep, NodeIdT};
use {Contribution, DaStep, Epoched, NodeIdT};
impl<T, N, Q> Epoched for QueueingHoneyBadger<T, N, Q>
where
T: Contribution + Serialize + DeserializeOwned + Clone,
N: NodeIdT + Serialize + DeserializeOwned + Rand,
Q: TransactionQueue<T>,
{
type Epoch = (u64, u64);
fn epoch(&self) -> (u64, u64) {
self.dyn_hb().epoch()
}
}
impl<T, N, Q> SenderQueueableDistAlgorithm for QueueingHoneyBadger<T, N, Q>
where

View File

@ -6,7 +6,6 @@ use std::hash::Hash;
use std::iter::once;
use failure::Fail;
use rand::Rand;
use serde::{de::DeserializeOwned, Serialize};
use fault_log::{Fault, FaultLog};
@ -180,30 +179,19 @@ where
/// notion of _epoch_. This interface summarizes the properties that are essential for the message
/// sender queue.
pub trait Epoched {
/// Type of epoch. It is not required to be totally ordered.
/// Type of epoch.
type Epoch: EpochT;
/// A subtype of `Epoch` which contains sets of "linearizable epochs" such that each of those
/// sets is totally ordered and each has a least element.
type LinEpoch: EpochT;
/// Returns the object's epoch number.
fn epoch(&self) -> Self::Epoch;
/// Returns the object's linearizable epoch number if the object's epoch can be linearized.
fn linearizable_epoch(&self) -> Option<Self::LinEpoch>;
}
impl<M: Epoched, N> Epoched for TargetedMessage<M, N> {
type Epoch = <M as Epoched>::Epoch;
type LinEpoch = <M as Epoched>::LinEpoch;
fn epoch(&self) -> Self::Epoch {
self.message.epoch()
}
fn linearizable_epoch(&self) -> Option<Self::LinEpoch> {
self.message.linearizable_epoch()
}
}
/// An alias for the type of `Step` returned by `D`'s methods.
@ -212,15 +200,15 @@ pub type DaStep<D> =
impl<'i, M, O, N> Step<M, O, N>
where
N: NodeIdT + Rand,
M: 'i + Clone + SenderQueueableMessage + Serialize + DeserializeOwned,
N: NodeIdT,
M: 'i + Clone + SenderQueueableMessage,
{
/// Removes and returns any messages that are not yet accepted by remote nodes according to the
/// mapping `remote_epochs`. This way the returned messages are postponed until later, and the
/// remaining messages can be sent to remote nodes without delay.
pub fn defer_messages(
&mut self,
peer_epochs: &BTreeMap<N, <M as Epoched>::LinEpoch>,
peer_epochs: &BTreeMap<N, <M as Epoched>::Epoch>,
max_future_epochs: u64,
) -> Vec<(N, M)> {
let mut deferred_msgs: Vec<(N, M)> = Vec::new();
@ -229,10 +217,10 @@ where
match msg.target.clone() {
Target::Node(id) => {
if let Some(&them) = peer_epochs.get(&id) {
if msg.message.is_accepted(them, max_future_epochs) {
passed_msgs.push(msg);
} else if !msg.message.is_obsolete(them) {
if msg.message.is_premature(them, max_future_epochs) {
deferred_msgs.push((id, msg.message));
} else if !msg.message.is_obsolete(them) {
passed_msgs.push(msg);
}
}
}
@ -246,11 +234,11 @@ where
// The `Target::All` message is split into two sets of point messages: those
// which can be sent without delay and those which should be postponed.
for (id, &them) in peer_epochs {
if msg.message.is_accepted(them, max_future_epochs) {
if msg.message.is_premature(them, max_future_epochs) {
deferred_msgs.push((id.clone(), msg.message.clone()));
} else if !msg.message.is_obsolete(them) {
passed_msgs
.push(Target::Node(id.clone()).message(msg.message.clone()));
} else if !msg.message.is_obsolete(them) {
deferred_msgs.push((id.clone(), msg.message.clone()));
}
}
}

View File

@ -24,7 +24,7 @@ use rand::Rng;
use hbbft::honey_badger::{Batch, HoneyBadger, MessageContent};
use hbbft::sender_queue::{self, SenderQueue, Step};
use hbbft::transaction_queue::TransactionQueue;
use hbbft::{threshold_decrypt, DistAlgorithm, Epoched, NetworkInfo, Target, TargetedMessage};
use hbbft::{threshold_decrypt, DistAlgorithm, NetworkInfo, Target, TargetedMessage};
use network::{
Adversary, MessageScheduler, MessageWithSender, NodeId, RandomAdversary, SilentAdversary,
@ -74,10 +74,10 @@ impl Adversary<UsizeHoneyBadger> for FaultyShareAdversary {
if sender_id < self.num_good {
if let TargetedMessage {
target: Target::All,
message,
message: sender_queue::Message::Algo(hb_msg),
} = msg
{
let epoch = message.epoch();
let epoch = hb_msg.epoch();
// Set the trigger to simulate decryption share messages.
self.share_triggers.entry(epoch).or_insert(true);
}