diff --git a/src/dynamic_honey_badger/dynamic_honey_badger.rs b/src/dynamic_honey_badger/dynamic_honey_badger.rs index b78fb75..3465209 100644 --- a/src/dynamic_honey_badger/dynamic_honey_badger.rs +++ b/src/dynamic_honey_badger/dynamic_honey_badger.rs @@ -21,7 +21,7 @@ use honey_badger::{self, HoneyBadger, Message as HbMessage}; use sync_key_gen::{Ack, AckOutcome, Part, PartOutcome, SyncKeyGen}; use threshold_decrypt::EncryptionSchedule; use util::{self, SubRng}; -use {Contribution, DistAlgorithm, NetworkInfo, NodeIdT, Target}; +use {Contribution, DistAlgorithm, Epoched, NetworkInfo, NodeIdT, Target}; /// A Honey Badger instance that can handle adding and removing nodes. #[derive(Derivative)] @@ -493,3 +493,15 @@ where write!(f, "{:?} DHB(era: {})", self.our_id(), self.era) } } + +impl Epoched for DynamicHoneyBadger +where + C: Contribution + Serialize + DeserializeOwned, + N: NodeIdT + Serialize + DeserializeOwned + Rand, +{ + type Epoch = (u64, u64); + + fn epoch(&self) -> (u64, u64) { + (self.era, self.honey_badger.epoch()) + } +} diff --git a/src/dynamic_honey_badger/mod.rs b/src/dynamic_honey_badger/mod.rs index 50c1bb4..59daf7b 100644 --- a/src/dynamic_honey_badger/mod.rs +++ b/src/dynamic_honey_badger/mod.rs @@ -72,7 +72,6 @@ mod dynamic_honey_badger; mod error; mod votes; -use std::cmp::Ordering; use std::collections::BTreeMap; use crypto::{PublicKey, PublicKeySet, Signature}; @@ -83,7 +82,7 @@ use self::votes::{SignedVote, VoteCounter}; use super::threshold_decrypt::EncryptionSchedule; use honey_badger::Message as HbMessage; use sync_key_gen::{Ack, Part, SyncKeyGen}; -use {Epoched, NodeIdT}; +use NodeIdT; pub use self::batch::Batch; pub use self::builder::DynamicHoneyBadgerBuilder; @@ -134,62 +133,6 @@ impl Message { } } -/// Dynamic Honey Badger epoch. It consists of an era and an epoch of Honey Badger that started in -/// that era. For messages originating from `DynamicHoneyBadger` as opposed to `HoneyBadger`, that -/// HoneyBadger epoch is `None`. -#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, Serialize, Deserialize)] -pub struct Epoch(pub(super) u64, pub(super) Option); - -/// The injection of linearizable epochs into `DynamicHoneyBadger` epochs. -impl From<(u64, u64)> for Epoch { - fn from((era, hb_epoch): (u64, u64)) -> Epoch { - Epoch(era, Some(hb_epoch)) - } -} - -impl PartialOrd for Epoch { - /// Partial ordering on epochs. For any `era` and `hb_epoch`, two epochs `Epoch(era, None)` and `Epoch(era, - /// Some(hb_epoch))` are incomparable. - fn partial_cmp(&self, other: &Epoch) -> Option { - let (&Epoch(a, b), &Epoch(c, d)) = (self, other); - if a < c { - Some(Ordering::Less) - } else if a > c { - Some(Ordering::Greater) - } else if b.is_none() && d.is_none() { - Some(Ordering::Equal) - } else if let (Some(b), Some(d)) = (b, d) { - Some(Ord::cmp(&b, &d)) - } else { - None - } - } -} - -impl Default for Epoch { - fn default() -> Epoch { - Epoch(0, Some(0)) - } -} - -impl Epoched for Message { - type Epoch = Epoch; - type LinEpoch = (u64, u64); - - fn epoch(&self) -> Epoch { - match *self { - Message::HoneyBadger(era, ref msg) => Epoch(era, Some(msg.epoch())), - Message::KeyGen(era, _, _) => Epoch(era, None), - Message::SignedVote(ref signed_vote) => Epoch(signed_vote.era(), None), - } - } - - fn linearizable_epoch(&self) -> Option<(u64, u64)> { - let Epoch(era, hb_epoch) = self.epoch(); - hb_epoch.map(|hb_epoch| (era, hb_epoch)) - } -} - /// The information a new node requires to join the network as an observer. It contains the state /// of voting and key generation after a specific epoch, so that the new node will be in sync if it /// joins in the next one. diff --git a/src/honey_badger/honey_badger.rs b/src/honey_badger/honey_badger.rs index c513ca2..b69cc0b 100644 --- a/src/honey_badger/honey_badger.rs +++ b/src/honey_badger/honey_badger.rs @@ -8,7 +8,7 @@ use serde::{de::DeserializeOwned, Serialize}; use super::epoch_state::EpochState; use super::{Batch, Error, ErrorKind, HoneyBadgerBuilder, Message, Result}; -use {util, Contribution, DistAlgorithm, Epoched, Fault, FaultKind, NetworkInfo, NodeIdT}; +use {util, Contribution, DistAlgorithm, Fault, FaultKind, NetworkInfo, NodeIdT}; pub use super::epoch_state::SubsetHandlingStrategy; use threshold_decrypt::EncryptionSchedule; @@ -40,19 +40,6 @@ pub struct HoneyBadger { pub(super) encryption_schedule: EncryptionSchedule, } -impl Epoched for HoneyBadger { - type Epoch = u64; - type LinEpoch = u64; - - fn epoch(&self) -> Self::Epoch { - self.epoch - } - - fn linearizable_epoch(&self) -> Option { - Some(self.epoch) - } -} - pub type Step = ::DaStep>; impl DistAlgorithm for HoneyBadger @@ -127,16 +114,16 @@ where return Err(ErrorKind::UnknownSender.into()); } let Message { epoch, content } = message; - if self.epoch <= epoch && epoch <= self.epoch + self.max_future_epochs { + if epoch > self.epoch + self.max_future_epochs { + Ok(Fault::new(sender_id.clone(), FaultKind::UnexpectedHbMessageEpoch).into()) + } else if epoch < self.epoch { + // The message is late; discard it. + Ok(Step::default()) + } else { let step = self .epoch_state_mut(epoch)? .handle_message_content(sender_id, content)?; Ok(step.join(self.try_output_batches()?)) - } else if epoch > self.epoch + self.max_future_epochs { - Ok(Fault::new(sender_id.clone(), FaultKind::UnexpectedHbMessageEpoch).into()) - } else { - // The message is late; discard it. - Ok(Step::default()) } } @@ -145,10 +132,17 @@ where !self.netinfo.is_validator() || self.has_input } + /// Returns the current encryption schedule that determines in which epochs contributions are + /// encrypted. pub fn get_encryption_schedule(&self) -> EncryptionSchedule { self.encryption_schedule } + /// Returns the current epoch. + pub fn epoch(&self) -> u64 { + self.epoch + } + /// Returns the number of validators from which we have already received a proposal for the /// current epoch. pub(crate) fn received_proposals(&self) -> usize { diff --git a/src/honey_badger/message.rs b/src/honey_badger/message.rs index 408c77e..a92c083 100644 --- a/src/honey_badger/message.rs +++ b/src/honey_badger/message.rs @@ -5,8 +5,6 @@ use serde_derive::{Deserialize, Serialize}; use subset; use threshold_decrypt; -use Epoched; - /// The content of a `HoneyBadger` message. It should be further annotated with an epoch. #[derive(Clone, Debug, Deserialize, Rand, Serialize)] pub enum MessageContent { @@ -35,15 +33,9 @@ pub struct Message { pub(super) content: MessageContent, } -impl Epoched for Message { - type Epoch = u64; - type LinEpoch = u64; - - fn epoch(&self) -> u64 { +impl Message { + /// Returns this message's Honey Badger epoch. + pub fn epoch(&self) -> u64 { self.epoch } - - fn linearizable_epoch(&self) -> Option { - Some(self.epoch) - } } diff --git a/src/queueing_honey_badger/mod.rs b/src/queueing_honey_badger/mod.rs index 126e5f0..7ede1bc 100644 --- a/src/queueing_honey_badger/mod.rs +++ b/src/queueing_honey_badger/mod.rs @@ -36,7 +36,7 @@ use dynamic_honey_badger::{self, Batch as DhbBatch, DynamicHoneyBadger, Message} use transaction_queue::TransactionQueue; use {util, Contribution, DistAlgorithm, NodeIdT}; -pub use dynamic_honey_badger::{Change, ChangeState, Epoch, Input, NodeChange}; +pub use dynamic_honey_badger::{Change, ChangeState, Input, NodeChange}; /// Queueing honey badger error variants. #[derive(Debug, Fail)] diff --git a/src/sender_queue/dynamic_honey_badger.rs b/src/sender_queue/dynamic_honey_badger.rs index 9fbc426..855cc05 100644 --- a/src/sender_queue/dynamic_honey_badger.rs +++ b/src/sender_queue/dynamic_honey_badger.rs @@ -7,13 +7,12 @@ use rand::Rand; use serde::{de::DeserializeOwned, Serialize}; use super::{ - SenderQueue, SenderQueueableDistAlgorithm, SenderQueueableEpoch, SenderQueueableMessage, - SenderQueueableOutput, + SenderQueue, SenderQueueableDistAlgorithm, SenderQueueableMessage, SenderQueueableOutput, }; use {Contribution, DaStep, Epoched, NodeIdT}; use dynamic_honey_badger::{ - Batch, Change, ChangeState, DynamicHoneyBadger, Epoch, Error as DhbError, Message, NodeChange, + Batch, Change, ChangeState, DynamicHoneyBadger, Error as DhbError, Message, NodeChange, }; impl SenderQueueableOutput> for Batch @@ -31,14 +30,16 @@ where None } } +} - fn next_epoch(&self) -> (u64, u64) { - let epoch = self.epoch(); - let era = self.era(); - if *self.change() == ChangeState::None { - (era, epoch - era + 1) - } else { - (epoch + 1, 0) +impl Epoched for Message { + type Epoch = (u64, u64); + + fn epoch(&self) -> (u64, u64) { + match *self { + Message::HoneyBadger(era, ref msg) => (era, msg.epoch()), + Message::KeyGen(era, _, _) => (era, 0), + Message::SignedVote(ref signed_vote) => (signed_vote.era(), 0), } } } @@ -47,37 +48,23 @@ impl SenderQueueableMessage for Message where N: Rand, { - fn is_accepted(&self, (them_era, them): (u64, u64), max_future_epochs: u64) -> bool { - let Epoch(era, us) = self.epoch(); - if era != them_era { - return false; - } - if let Some(us) = us { - them <= us && us <= them + max_future_epochs - } else { - true + fn is_premature(&self, (them_era, them): (u64, u64), max_future_epochs: u64) -> bool { + match *self { + Message::HoneyBadger(era, ref msg) => { + era > them_era || (era == them_era && msg.epoch() > them + max_future_epochs) + } + Message::KeyGen(era, _, _) => era > them_era, + Message::SignedVote(ref signed_vote) => signed_vote.era() > them_era, } } fn is_obsolete(&self, (them_era, them): (u64, u64)) -> bool { - let Epoch(era, us) = self.epoch(); - if era < them_era { - return true; - } - if let Some(us) = us { - era == them_era && us < them - } else { - false - } - } -} - -impl SenderQueueableEpoch for Epoch { - fn spanning_epochs(&self) -> Vec { - if let Epoch(era, Some(_)) = *self { - vec![Epoch(era, None)] - } else { - vec![] + match *self { + Message::HoneyBadger(era, ref msg) => { + era < them_era || (era == them_era && msg.epoch() < them) + } + Message::KeyGen(era, _, _) => era < them_era, + Message::SignedVote(ref signed_vote) => signed_vote.era() < them_era, } } } diff --git a/src/sender_queue/honey_badger.rs b/src/sender_queue/honey_badger.rs index 9529ce1..903e0a9 100644 --- a/src/sender_queue/honey_badger.rs +++ b/src/sender_queue/honey_badger.rs @@ -1,10 +1,7 @@ use rand::Rand; use serde::{de::DeserializeOwned, Serialize}; -use super::{ - SenderQueueableDistAlgorithm, SenderQueueableEpoch, SenderQueueableMessage, - SenderQueueableOutput, -}; +use super::{SenderQueueableDistAlgorithm, SenderQueueableMessage, SenderQueueableOutput}; use honey_badger::{Batch, HoneyBadger, Message}; use {Contribution, Epoched, NodeIdT}; @@ -16,9 +13,13 @@ where fn added_node(&self) -> Option { None } +} - fn next_epoch(&self) -> u64 { - self.epoch + 1 +impl Epoched for Message { + type Epoch = u64; + + fn epoch(&self) -> u64 { + self.epoch() } } @@ -26,9 +27,8 @@ impl SenderQueueableMessage for Message where N: Rand, { - fn is_accepted(&self, them: u64, max_future_epochs: u64) -> bool { - let our_epoch = self.epoch(); - them <= our_epoch && our_epoch <= them + max_future_epochs + fn is_premature(&self, them: u64, max_future_epochs: u64) -> bool { + self.epoch() > them + max_future_epochs } fn is_obsolete(&self, them: u64) -> bool { @@ -36,9 +36,15 @@ where } } -impl SenderQueueableEpoch for u64 { - fn spanning_epochs(&self) -> Vec { - vec![] +impl Epoched for HoneyBadger +where + C: Contribution + Serialize + DeserializeOwned, + N: NodeIdT + Rand, +{ + type Epoch = u64; + + fn epoch(&self) -> u64 { + self.epoch() } } diff --git a/src/sender_queue/message.rs b/src/sender_queue/message.rs index f86cb79..6c840c8 100644 --- a/src/sender_queue/message.rs +++ b/src/sender_queue/message.rs @@ -5,7 +5,7 @@ use Epoched; #[derive(Clone, Debug, Deserialize, Serialize)] pub enum Message { - EpochStarted(::LinEpoch), + EpochStarted(::Epoch), Algo(M), } @@ -13,7 +13,6 @@ impl Rand for Message where M: Epoched + Rand, ::Epoch: Rand, - ::LinEpoch: Rand, { fn rand(rng: &mut R) -> Self { let message_type = *rng.choose(&["epoch", "algo"]).unwrap(); @@ -26,29 +25,6 @@ where } } -impl Epoched for Message -where - M: Epoched, - ::Epoch: From<::LinEpoch>, -{ - type Epoch = ::Epoch; - type LinEpoch = ::LinEpoch; - - fn epoch(&self) -> Self::Epoch { - match self { - Message::EpochStarted(epoch) => ::Epoch::from(*epoch), - Message::Algo(message) => message.epoch(), - } - } - - fn linearizable_epoch(&self) -> Option { - match self { - Message::EpochStarted(epoch) => Some(*epoch), - Message::Algo(message) => message.linearizable_epoch(), - } - } -} - impl From for Message { fn from(message: M) -> Self { Message::Algo(message) diff --git a/src/sender_queue/mod.rs b/src/sender_queue/mod.rs index d8d3e84..df08bba 100644 --- a/src/sender_queue/mod.rs +++ b/src/sender_queue/mod.rs @@ -14,19 +14,21 @@ pub mod queueing_honey_badger; use std::collections::BTreeMap; use std::fmt::Debug; -use rand::Rand; -use serde::{de::DeserializeOwned, Serialize}; - use {DaStep, DistAlgorithm, Epoched, NodeIdT, Target}; pub use self::message::Message; pub trait SenderQueueableMessage: Epoched { - /// Whether the message is accepted in epoch `them`. - fn is_accepted(&self, them: ::LinEpoch, max_future_epochs: u64) -> bool; + /// Whether the message needs to be deferred. + fn is_premature(&self, them: ::Epoch, max_future_epochs: u64) -> bool; /// Whether the epoch of the message is behind `them`. - fn is_obsolete(&self, them: ::LinEpoch) -> bool; + fn is_obsolete(&self, them: ::Epoch) -> bool; + + /// Whether the message is neither obsolete nor premature. + fn is_accepted(&self, them: ::Epoch, max_future_epochs: u64) -> bool { + !self.is_premature(them, max_future_epochs) && !self.is_obsolete(them) + } } pub trait SenderQueueableOutput @@ -37,29 +39,9 @@ where /// Returns an optional new node added with the batch. This node should be added to the set of /// all nodes. fn added_node(&self) -> Option; - - /// Computes the next epoch after the `DynamicHoneyBadger` epoch of the batch. - fn next_epoch(&self) -> ::LinEpoch; } -pub trait SenderQueueableEpoch -where - Self: Sized, -{ - /// A _spanning epoch_ of an epoch `e` is an epoch `e0` such that - /// - /// - `e` and `e0` are incomparable by the partial ordering on epochs and - /// - /// - the duration of `e0` is at least that of `e`. - /// - /// Returned is a list of spanning epochs for the given epoch. - /// - /// For example, any `DynamicHoneyBadger` epoch `Epoch((x, Some(y)))` has a unique spanning - /// epoch `Epoch((x, None))`. In turn, no epoch `Epoch((x, None))` has a spanning epoch. - fn spanning_epochs(&self) -> Vec; -} - -pub trait SenderQueueableDistAlgorithm +pub trait SenderQueueableDistAlgorithm: Epoched where Self: DistAlgorithm, { @@ -69,11 +51,8 @@ where } pub type OutgoingQueue = BTreeMap< - ( - ::NodeId, - <::Message as Epoched>::Epoch, - ), - Vec<::Message>, + ::NodeId, + BTreeMap<<::Message as Epoched>::Epoch, Vec<::Message>>, >; /// An instance of `DistAlgorithm` wrapped with a queue of outgoing messages, that is, a sender @@ -86,32 +65,27 @@ pub type OutgoingQueue = BTreeMap< pub struct SenderQueue where D: SenderQueueableDistAlgorithm, - D::Message: Clone + SenderQueueableMessage + Serialize + DeserializeOwned, - D::NodeId: NodeIdT + Rand, - D::Output: SenderQueueableOutput, + D::Message: Epoched, { /// The managed `DistAlgorithm` instance. algo: D, /// Our node ID. our_id: D::NodeId, - /// Current linearizable epoch of the managed `DistAlgorithm`. - lin_epoch: ::LinEpoch, /// Messages that couldn't be handled yet by remote nodes. outgoing_queue: OutgoingQueue, /// The set of all remote nodes on the network including validator as well as non-validator /// (observer) nodes together with their epochs as of the last communication. - peer_epochs: BTreeMap::LinEpoch>, + peer_epochs: BTreeMap::Epoch>, } pub type Step = ::DaStep>; impl DistAlgorithm for SenderQueue where - D: SenderQueueableDistAlgorithm + Debug + Send + Sync, - D::Message: Clone + SenderQueueableMessage + Serialize + DeserializeOwned, - D::NodeId: NodeIdT + Rand, + D: SenderQueueableDistAlgorithm + Debug, + D::Message: Clone + SenderQueueableMessage + Epoched::Epoch>, + D::NodeId: NodeIdT, D::Output: SenderQueueableOutput, - ::Epoch: SenderQueueableEpoch + From<::LinEpoch>, { type NodeId = D::NodeId; type Input = D::Input; @@ -142,11 +116,10 @@ where impl SenderQueue where - D: SenderQueueableDistAlgorithm + Debug + Send + Sync, - D::Message: Clone + SenderQueueableMessage + Serialize + DeserializeOwned, - D::NodeId: NodeIdT + Rand, + D: SenderQueueableDistAlgorithm + Debug, + D::Message: Clone + SenderQueueableMessage + Epoched::Epoch>, + D::NodeId: NodeIdT, D::Output: SenderQueueableOutput, - ::Epoch: SenderQueueableEpoch + From<::LinEpoch>, { /// Returns a new `SenderQueueBuilder` configured to manage a given `DynamicHoneyBadger` /// instance. @@ -167,7 +140,7 @@ where message: Message, ) -> Result, D::Error> { match message { - Message::EpochStarted(lin_epoch) => Ok(self.handle_epoch_started(sender_id, lin_epoch)), + Message::EpochStarted(epoch) => Ok(self.handle_epoch_started(sender_id, epoch)), Message::Algo(msg) => self.handle_message_content(sender_id, msg), } } @@ -179,7 +152,7 @@ where F: FnOnce(&mut D) -> Result, D::Error>, { let mut step = f(&mut self.algo)?; - let mut sender_queue_step = self.update_lin_epoch(&step); + let mut sender_queue_step = self.update_epoch(&step); self.defer_messages(&mut step); sender_queue_step.extend(step.map(|output| output, Message::from)); Ok(sender_queue_step) @@ -189,34 +162,16 @@ where fn handle_epoch_started( &mut self, sender_id: &D::NodeId, - lin_epoch: ::LinEpoch, + epoch: ::Epoch, ) -> DaStep { self.peer_epochs .entry(sender_id.clone()) .and_modify(|e| { - if *e < lin_epoch { - *e = lin_epoch; + if *e < epoch { + *e = epoch; } - }).or_insert(lin_epoch); - self.remove_earlier_messages(sender_id, ::Epoch::from(lin_epoch)); - self.process_new_epoch(sender_id, ::Epoch::from(lin_epoch)) - } - - /// Removes all messages queued for the remote node from epochs upto `epoch`. - fn remove_earlier_messages( - &mut self, - sender_id: &D::NodeId, - epoch: ::Epoch, - ) { - let earlier_keys: Vec<_> = self - .outgoing_queue - .keys() - .cloned() - .filter(|(id, this_epoch)| id == sender_id && *this_epoch < epoch) - .collect(); - for key in earlier_keys { - self.outgoing_queue.remove(&key); - } + }).or_insert(epoch); + self.process_new_epoch(sender_id, epoch) } /// Processes an announcement of a new epoch update received from a remote node. @@ -225,24 +180,22 @@ where sender_id: &D::NodeId, epoch: ::Epoch, ) -> DaStep { - // Send any HB messages for the HB epoch. - let mut ready_messages = self - .outgoing_queue - .remove(&(sender_id.clone(), epoch)) - .unwrap_or_default(); - for u in epoch.spanning_epochs() { - // Send any DHB messages for the DHB era. - ready_messages.extend( - self.outgoing_queue - .remove(&(sender_id.clone(), u)) - .unwrap_or_default(), - ); - } - Step::::from( - ready_messages - .into_iter() - .map(|msg| Target::Node(sender_id.clone()).message(Message::Algo(msg))), - ) + let queue = match self.outgoing_queue.get_mut(sender_id) { + None => return DaStep::::default(), + Some(queue) => queue, + }; + let earlier_keys: Vec<_> = queue + .keys() + .cloned() + .take_while(|this_epoch| *this_epoch <= epoch) + .collect(); + earlier_keys + .into_iter() + .filter_map(|key| queue.remove(&key)) + .flatten() + .filter(|msg| !msg.is_obsolete(epoch)) + .map(|msg| Target::Node(sender_id.clone()).message(Message::Algo(msg))) + .into() } /// Handles a Honey Badger algorithm message in a given epoch. @@ -255,28 +208,20 @@ where } /// Updates the current Honey Badger epoch. - fn update_lin_epoch(&mut self, step: &DaStep) -> DaStep { - // Look up `DynamicHoneyBadger` epoch updates and collect any added peers. - let new_epoch = step.output.iter().fold(self.lin_epoch, |lin_epoch, batch| { - let max_epoch = lin_epoch.max(batch.next_epoch()); - if let Some(node) = batch.added_node() { - if &node != self.our_id() { - self.peer_epochs - .entry(node) - .or_insert_with(::LinEpoch::default); - } - } - max_epoch - }); - if new_epoch != self.lin_epoch { - self.lin_epoch = new_epoch; - // Announce the new epoch. - Target::All - .message(Message::EpochStarted(self.lin_epoch)) - .into() - } else { - Step::::default() + fn update_epoch(&mut self, step: &DaStep) -> DaStep { + if step.output.is_empty() { + return Step::::default(); } + // Look up `DynamicHoneyBadger` epoch updates and collect any added peers. + for node in step.output.iter().filter_map(|batch| batch.added_node()) { + if &node != self.our_id() { + self.peer_epochs.entry(node).or_default(); + } + } + // Announce the new epoch. + Target::All + .message(Message::EpochStarted(self.algo.epoch())) + .into() } /// Removes any messages to nodes at earlier epochs from the given `Step`. This may involve @@ -287,10 +232,11 @@ where let max_future_epochs = self.algo.max_future_epochs(); // Append the deferred messages onto the queues. for (id, message) in step.defer_messages(&self.peer_epochs, max_future_epochs) { - let epoch = message.epoch(); self.outgoing_queue - .entry((id, epoch)) - .or_insert_with(Vec::new) + .entry(id) + .or_default() + .entry(message.epoch()) + .or_default() .push(message); } } @@ -309,18 +255,16 @@ where D::Message: Epoched, { algo: D, - lin_epoch: ::LinEpoch, outgoing_queue: OutgoingQueue, - peer_epochs: BTreeMap::LinEpoch>, + peer_epochs: BTreeMap::Epoch>, } impl SenderQueueBuilder where - D: SenderQueueableDistAlgorithm + Debug + Send + Sync, - D::Message: Clone + SenderQueueableMessage + Serialize + DeserializeOwned, - D::NodeId: NodeIdT + Rand, + D: SenderQueueableDistAlgorithm + Debug, + D::Message: Clone + SenderQueueableMessage + Epoched::Epoch>, + D::NodeId: NodeIdT, D::Output: SenderQueueableOutput, - ::Epoch: SenderQueueableEpoch + From<::LinEpoch>, { pub fn new(algo: D, peer_ids: I) -> Self where @@ -328,19 +272,13 @@ where { SenderQueueBuilder { algo, - lin_epoch: ::LinEpoch::default(), outgoing_queue: BTreeMap::default(), peer_epochs: peer_ids - .map(|id| (id, ::LinEpoch::default())) + .map(|id| (id, ::Epoch::default())) .collect(), } } - pub fn lin_epoch(mut self, lin_epoch: ::LinEpoch) -> Self { - self.lin_epoch = lin_epoch; - self - } - pub fn outgoing_queue(mut self, outgoing_queue: OutgoingQueue) -> Self { self.outgoing_queue = outgoing_queue; self @@ -348,22 +286,21 @@ where pub fn peer_epochs( mut self, - peer_epochs: BTreeMap::LinEpoch>, + peer_epochs: BTreeMap::Epoch>, ) -> Self { self.peer_epochs = peer_epochs; self } pub fn build(self, our_id: D::NodeId) -> (SenderQueue, DaStep>) { - let lin_epoch = ::LinEpoch::default(); + let epoch = self.algo.epoch(); let sq = SenderQueue { algo: self.algo, our_id, - lin_epoch: self.lin_epoch, outgoing_queue: self.outgoing_queue, peer_epochs: self.peer_epochs, }; - let step = Target::All.message(Message::EpochStarted(lin_epoch)).into(); + let step = Target::All.message(Message::EpochStarted(epoch)).into(); (sq, step) } } diff --git a/src/sender_queue/queueing_honey_badger.rs b/src/sender_queue/queueing_honey_badger.rs index 0363ec8..4273603 100644 --- a/src/sender_queue/queueing_honey_badger.rs +++ b/src/sender_queue/queueing_honey_badger.rs @@ -9,7 +9,20 @@ use serde::{de::DeserializeOwned, Serialize}; use super::{SenderQueue, SenderQueueableDistAlgorithm}; use queueing_honey_badger::{Change, Error as QhbError, QueueingHoneyBadger}; use transaction_queue::TransactionQueue; -use {Contribution, DaStep, NodeIdT}; +use {Contribution, DaStep, Epoched, NodeIdT}; + +impl Epoched for QueueingHoneyBadger +where + T: Contribution + Serialize + DeserializeOwned + Clone, + N: NodeIdT + Serialize + DeserializeOwned + Rand, + Q: TransactionQueue, +{ + type Epoch = (u64, u64); + + fn epoch(&self) -> (u64, u64) { + self.dyn_hb().epoch() + } +} impl SenderQueueableDistAlgorithm for QueueingHoneyBadger where diff --git a/src/traits.rs b/src/traits.rs index d252704..51c1f78 100644 --- a/src/traits.rs +++ b/src/traits.rs @@ -6,7 +6,6 @@ use std::hash::Hash; use std::iter::once; use failure::Fail; -use rand::Rand; use serde::{de::DeserializeOwned, Serialize}; use fault_log::{Fault, FaultLog}; @@ -180,30 +179,19 @@ where /// notion of _epoch_. This interface summarizes the properties that are essential for the message /// sender queue. pub trait Epoched { - /// Type of epoch. It is not required to be totally ordered. + /// Type of epoch. type Epoch: EpochT; - /// A subtype of `Epoch` which contains sets of "linearizable epochs" such that each of those - /// sets is totally ordered and each has a least element. - type LinEpoch: EpochT; /// Returns the object's epoch number. fn epoch(&self) -> Self::Epoch; - - /// Returns the object's linearizable epoch number if the object's epoch can be linearized. - fn linearizable_epoch(&self) -> Option; } impl Epoched for TargetedMessage { type Epoch = ::Epoch; - type LinEpoch = ::LinEpoch; fn epoch(&self) -> Self::Epoch { self.message.epoch() } - - fn linearizable_epoch(&self) -> Option { - self.message.linearizable_epoch() - } } /// An alias for the type of `Step` returned by `D`'s methods. @@ -212,15 +200,15 @@ pub type DaStep = impl<'i, M, O, N> Step where - N: NodeIdT + Rand, - M: 'i + Clone + SenderQueueableMessage + Serialize + DeserializeOwned, + N: NodeIdT, + M: 'i + Clone + SenderQueueableMessage, { /// Removes and returns any messages that are not yet accepted by remote nodes according to the /// mapping `remote_epochs`. This way the returned messages are postponed until later, and the /// remaining messages can be sent to remote nodes without delay. pub fn defer_messages( &mut self, - peer_epochs: &BTreeMap::LinEpoch>, + peer_epochs: &BTreeMap::Epoch>, max_future_epochs: u64, ) -> Vec<(N, M)> { let mut deferred_msgs: Vec<(N, M)> = Vec::new(); @@ -229,10 +217,10 @@ where match msg.target.clone() { Target::Node(id) => { if let Some(&them) = peer_epochs.get(&id) { - if msg.message.is_accepted(them, max_future_epochs) { - passed_msgs.push(msg); - } else if !msg.message.is_obsolete(them) { + if msg.message.is_premature(them, max_future_epochs) { deferred_msgs.push((id, msg.message)); + } else if !msg.message.is_obsolete(them) { + passed_msgs.push(msg); } } } @@ -246,11 +234,11 @@ where // The `Target::All` message is split into two sets of point messages: those // which can be sent without delay and those which should be postponed. for (id, &them) in peer_epochs { - if msg.message.is_accepted(them, max_future_epochs) { + if msg.message.is_premature(them, max_future_epochs) { + deferred_msgs.push((id.clone(), msg.message.clone())); + } else if !msg.message.is_obsolete(them) { passed_msgs .push(Target::Node(id.clone()).message(msg.message.clone())); - } else if !msg.message.is_obsolete(them) { - deferred_msgs.push((id.clone(), msg.message.clone())); } } } diff --git a/tests/honey_badger.rs b/tests/honey_badger.rs index ccaeb57..3b7fa67 100644 --- a/tests/honey_badger.rs +++ b/tests/honey_badger.rs @@ -24,7 +24,7 @@ use rand::Rng; use hbbft::honey_badger::{Batch, HoneyBadger, MessageContent}; use hbbft::sender_queue::{self, SenderQueue, Step}; use hbbft::transaction_queue::TransactionQueue; -use hbbft::{threshold_decrypt, DistAlgorithm, Epoched, NetworkInfo, Target, TargetedMessage}; +use hbbft::{threshold_decrypt, DistAlgorithm, NetworkInfo, Target, TargetedMessage}; use network::{ Adversary, MessageScheduler, MessageWithSender, NodeId, RandomAdversary, SilentAdversary, @@ -74,10 +74,10 @@ impl Adversary for FaultyShareAdversary { if sender_id < self.num_good { if let TargetedMessage { target: Target::All, - message, + message: sender_queue::Message::Algo(hb_msg), } = msg { - let epoch = message.epoch(); + let epoch = hb_msg.epoch(); // Set the trigger to simulate decryption share messages. self.share_triggers.entry(epoch).or_insert(true); }