Remove Epoched bound from SenderQueueableMessage.

This commit is contained in:
Andreas Fackler 2018-11-08 12:03:48 +01:00 committed by Andreas Fackler
parent d0b96f2dc8
commit b3c63774a7
5 changed files with 46 additions and 71 deletions

View File

@ -9,7 +9,7 @@ use serde::{de::DeserializeOwned, Serialize};
use super::{
SenderQueue, SenderQueueableDistAlgorithm, SenderQueueableMessage, SenderQueueableOutput,
};
use {Contribution, DaStep, Epoched, NodeIdT};
use {Contribution, DaStep, NodeIdT};
use dynamic_honey_badger::{
Batch, Change, ChangeState, DynamicHoneyBadger, Error as DhbError, Message, NodeChange,
@ -32,22 +32,12 @@ where
}
}
impl<N: Rand> Epoched for Message<N> {
type Epoch = (u64, u64);
fn epoch(&self) -> (u64, u64) {
match *self {
Message::HoneyBadger(era, ref msg) => (era, msg.epoch()),
Message::KeyGen(era, _, _) => (era, 0),
Message::SignedVote(ref signed_vote) => (signed_vote.era(), 0),
}
}
}
impl<N> SenderQueueableMessage for Message<N>
where
N: Rand,
{
type Epoch = (u64, u64);
fn is_premature(&self, (them_era, them): (u64, u64), max_future_epochs: u64) -> bool {
match *self {
Message::HoneyBadger(era, ref msg) => {
@ -67,6 +57,14 @@ where
Message::SignedVote(ref signed_vote) => signed_vote.era() < them_era,
}
}
fn first_epoch(&self) -> (u64, u64) {
match *self {
Message::HoneyBadger(era, ref msg) => (era, msg.epoch()),
Message::KeyGen(era, _, _) => (era, 0),
Message::SignedVote(ref signed_vote) => (signed_vote.era(), 0),
}
}
}
impl<C, N> SenderQueueableDistAlgorithm for DynamicHoneyBadger<C, N>

View File

@ -15,18 +15,12 @@ where
}
}
impl<N: Rand> Epoched for Message<N> {
type Epoch = u64;
fn epoch(&self) -> u64 {
self.epoch()
}
}
impl<N> SenderQueueableMessage for Message<N>
where
N: Rand,
{
type Epoch = u64;
fn is_premature(&self, them: u64, max_future_epochs: u64) -> bool {
self.epoch() > them + max_future_epochs
}
@ -34,6 +28,10 @@ where
fn is_obsolete(&self, them: u64) -> bool {
self.epoch() < them
}
fn first_epoch(&self) -> u64 {
self.epoch()
}
}
impl<C, N> Epoched for HoneyBadger<C, N>

View File

@ -1,18 +1,18 @@
use rand::{Rand, Rng};
use serde_derive::{Deserialize, Serialize};
use Epoched;
use super::SenderQueueableMessage;
#[derive(Clone, Debug, Deserialize, Serialize)]
pub enum Message<M: Epoched> {
EpochStarted(<M as Epoched>::Epoch),
pub enum Message<M: SenderQueueableMessage> {
EpochStarted(M::Epoch),
Algo(M),
}
impl<M> Rand for Message<M>
where
M: Epoched + Rand,
<M as Epoched>::Epoch: Rand,
M: SenderQueueableMessage + Rand,
M::Epoch: Rand,
{
fn rand<R: Rng>(rng: &mut R) -> Self {
let message_type = *rng.choose(&["epoch", "algo"]).unwrap();
@ -25,7 +25,7 @@ where
}
}
impl<M: Epoched> From<M> for Message<M> {
impl<M: SenderQueueableMessage> From<M> for Message<M> {
fn from(message: M) -> Self {
Message::Algo(message)
}

View File

@ -14,37 +14,39 @@ pub mod queueing_honey_badger;
use std::collections::BTreeMap;
use std::fmt::Debug;
use traits::EpochT;
use {DaStep, DistAlgorithm, Epoched, NodeIdT, Target};
pub use self::message::Message;
pub trait SenderQueueableMessage: Epoched {
pub trait SenderQueueableMessage {
type Epoch: EpochT;
/// Whether the message needs to be deferred.
fn is_premature(&self, them: <Self as Epoched>::Epoch, max_future_epochs: u64) -> bool;
fn is_premature(&self, them: Self::Epoch, max_future_epochs: u64) -> bool;
/// Whether the epoch of the message is behind `them`.
fn is_obsolete(&self, them: <Self as Epoched>::Epoch) -> bool;
fn is_obsolete(&self, them: Self::Epoch) -> bool;
/// Whether the message is neither obsolete nor premature.
fn is_accepted(&self, them: <Self as Epoched>::Epoch, max_future_epochs: u64) -> bool {
fn is_accepted(&self, them: Self::Epoch, max_future_epochs: u64) -> bool {
!self.is_premature(them, max_future_epochs) && !self.is_obsolete(them)
}
/// Returns the earliest epoch in which this message can be handled.
fn first_epoch(&self) -> Self::Epoch;
}
pub trait SenderQueueableOutput<N, M>
where
N: NodeIdT,
M: Epoched,
{
/// Returns an optional new node added with the batch. This node should be added to the set of
/// all nodes.
fn added_node(&self) -> Option<N>;
}
pub trait SenderQueueableDistAlgorithm: Epoched
where
Self: DistAlgorithm,
{
pub trait SenderQueueableDistAlgorithm: Epoched + DistAlgorithm {
/// The maximum number of subsequent future epochs that the `DistAlgorithm` is allowed to handle
/// messages for.
fn max_future_epochs(&self) -> u64;
@ -52,7 +54,7 @@ where
pub type OutgoingQueue<D> = BTreeMap<
<D as DistAlgorithm>::NodeId,
BTreeMap<<<D as DistAlgorithm>::Message as Epoched>::Epoch, Vec<<D as DistAlgorithm>::Message>>,
BTreeMap<<D as Epoched>::Epoch, Vec<<D as DistAlgorithm>::Message>>,
>;
/// An instance of `DistAlgorithm` wrapped with a queue of outgoing messages, that is, a sender
@ -65,7 +67,6 @@ pub type OutgoingQueue<D> = BTreeMap<
pub struct SenderQueue<D>
where
D: SenderQueueableDistAlgorithm,
D::Message: Epoched,
{
/// The managed `DistAlgorithm` instance.
algo: D,
@ -75,7 +76,7 @@ where
outgoing_queue: OutgoingQueue<D>,
/// The set of all remote nodes on the network including validator as well as non-validator
/// (observer) nodes together with their epochs as of the last communication.
peer_epochs: BTreeMap<D::NodeId, <D::Message as Epoched>::Epoch>,
peer_epochs: BTreeMap<D::NodeId, D::Epoch>,
}
pub type Step<D> = ::DaStep<SenderQueue<D>>;
@ -83,7 +84,7 @@ pub type Step<D> = ::DaStep<SenderQueue<D>>;
impl<D> DistAlgorithm for SenderQueue<D>
where
D: SenderQueueableDistAlgorithm + Debug,
D::Message: Clone + SenderQueueableMessage + Epoched<Epoch = <D as Epoched>::Epoch>,
D::Message: Clone + SenderQueueableMessage<Epoch = D::Epoch>,
D::NodeId: NodeIdT,
D::Output: SenderQueueableOutput<D::NodeId, D::Message>,
{
@ -117,7 +118,7 @@ where
impl<D> SenderQueue<D>
where
D: SenderQueueableDistAlgorithm + Debug,
D::Message: Clone + SenderQueueableMessage + Epoched<Epoch = <D as Epoched>::Epoch>,
D::Message: Clone + SenderQueueableMessage<Epoch = D::Epoch>,
D::NodeId: NodeIdT,
D::Output: SenderQueueableOutput<D::NodeId, D::Message>,
{
@ -159,11 +160,7 @@ where
}
/// Handles an epoch start announcement.
fn handle_epoch_started(
&mut self,
sender_id: &D::NodeId,
epoch: <D::Message as Epoched>::Epoch,
) -> DaStep<Self> {
fn handle_epoch_started(&mut self, sender_id: &D::NodeId, epoch: D::Epoch) -> DaStep<Self> {
self.peer_epochs
.entry(sender_id.clone())
.and_modify(|e| {
@ -175,11 +172,7 @@ where
}
/// Processes an announcement of a new epoch update received from a remote node.
fn process_new_epoch(
&mut self,
sender_id: &D::NodeId,
epoch: <D::Message as Epoched>::Epoch,
) -> DaStep<Self> {
fn process_new_epoch(&mut self, sender_id: &D::NodeId, epoch: D::Epoch) -> DaStep<Self> {
let queue = match self.outgoing_queue.get_mut(sender_id) {
None => return DaStep::<Self>::default(),
Some(queue) => queue,
@ -235,7 +228,7 @@ where
self.outgoing_queue
.entry(id)
.or_default()
.entry(message.epoch())
.entry(message.first_epoch())
.or_default()
.push(message);
}
@ -252,17 +245,16 @@ where
pub struct SenderQueueBuilder<D>
where
D: SenderQueueableDistAlgorithm,
D::Message: Epoched,
{
algo: D,
outgoing_queue: OutgoingQueue<D>,
peer_epochs: BTreeMap<D::NodeId, <D::Message as Epoched>::Epoch>,
peer_epochs: BTreeMap<D::NodeId, D::Epoch>,
}
impl<D> SenderQueueBuilder<D>
where
D: SenderQueueableDistAlgorithm + Debug,
D::Message: Clone + SenderQueueableMessage + Epoched<Epoch = <D as Epoched>::Epoch>,
D::Message: Clone + SenderQueueableMessage<Epoch = D::Epoch>,
D::NodeId: NodeIdT,
D::Output: SenderQueueableOutput<D::NodeId, D::Message>,
{
@ -273,9 +265,7 @@ where
SenderQueueBuilder {
algo,
outgoing_queue: BTreeMap::default(),
peer_epochs: peer_ids
.map(|id| (id, <D::Message as Epoched>::Epoch::default()))
.collect(),
peer_epochs: peer_ids.map(|id| (id, D::Epoch::default())).collect(),
}
}
@ -284,10 +274,7 @@ where
self
}
pub fn peer_epochs(
mut self,
peer_epochs: BTreeMap<D::NodeId, <D::Message as Epoched>::Epoch>,
) -> Self {
pub fn peer_epochs(mut self, peer_epochs: BTreeMap<D::NodeId, D::Epoch>) -> Self {
self.peer_epochs = peer_epochs;
self
}

View File

@ -186,14 +186,6 @@ pub trait Epoched {
fn epoch(&self) -> Self::Epoch;
}
impl<M: Epoched, N> Epoched for TargetedMessage<M, N> {
type Epoch = <M as Epoched>::Epoch;
fn epoch(&self) -> Self::Epoch {
self.message.epoch()
}
}
/// An alias for the type of `Step` returned by `D`'s methods.
pub type DaStep<D> =
Step<<D as DistAlgorithm>::Message, <D as DistAlgorithm>::Output, <D as DistAlgorithm>::NodeId>;
@ -208,7 +200,7 @@ where
/// remaining messages can be sent to remote nodes without delay.
pub fn defer_messages(
&mut self,
peer_epochs: &BTreeMap<N, <M as Epoched>::Epoch>,
peer_epochs: &BTreeMap<N, M::Epoch>,
max_future_epochs: u64,
) -> Vec<(N, M)> {
let mut deferred_msgs: Vec<(N, M)> = Vec::new();