work on review comments

This commit is contained in:
Vladimir Komendantskiy 2018-11-01 18:18:08 +00:00
parent ee46dd4b81
commit a8586efc81
9 changed files with 67 additions and 95 deletions

View File

@ -376,7 +376,7 @@ impl EpochInfo {
let txs = batch.iter().unique().count();
println!(
"{:>5} {:6} {:6} {:5} {:9} {:>9}B",
batch.seqnum().to_string().cyan(),
batch.epoch().to_string().cyan(),
min_t.as_secs() * 1000 + u64::from(max_t.subsec_nanos()) / 1_000_000,
max_t.as_secs() * 1000 + u64::from(max_t.subsec_nanos()) / 1_000_000,
txs,
@ -397,7 +397,7 @@ fn simulate_honey_badger(mut network: TestNetwork<QHB>) {
let mut epochs = Vec::new();
while let Some(id) = network.step() {
for &(time, ref batch) in &network.nodes[&id].outputs {
let epoch = batch.seqnum() as usize;
let epoch = batch.epoch() as usize;
if epochs.len() <= epoch {
epochs.resize(epoch + 1, EpochInfo::default());
}
@ -437,14 +437,18 @@ fn main() {
.map(|_| Transaction::new(args.flag_tx_size))
.collect();
let new_honey_badger = |netinfo: NetworkInfo<NodeId>| {
let dhb = DynamicHoneyBadger::builder().build(netinfo.clone());
let our_id = *netinfo.our_id();
let peer_ids: Vec<_> = netinfo
.all_ids()
.filter(|&&them| them != our_id)
.cloned()
.collect();
let dhb = DynamicHoneyBadger::builder().build(netinfo);
let (qhb, qhb_step) = QueueingHoneyBadger::builder(dhb)
.batch_size(args.flag_b)
.build_with_transactions(txs.clone(), rand::thread_rng().gen::<Isaac64Rng>())
.expect("instantiate QueueingHoneyBadger");
let our_id = *netinfo.our_id();
let peer_ids = netinfo.all_ids().filter(|&&them| them != our_id).cloned();
let (sq, mut step) = SenderQueue::builder(qhb, peer_ids).build(our_id);
let (sq, mut step) = SenderQueue::builder(qhb, peer_ids.into_iter()).build(our_id);
step.extend_with(qhb_step, Message::from);
(sq, step)
};

View File

@ -2,14 +2,14 @@ use std::collections::BTreeMap;
use std::sync::Arc;
use super::EncryptionSchedule;
use super::{ChangeState, Epoch, JoinPlan};
use {Epoched, NetworkInfo, NodeIdT};
use super::{ChangeState, JoinPlan};
use {NetworkInfo, NodeIdT};
/// A batch of transactions the algorithm has output.
#[derive(Clone, Debug)]
pub struct Batch<C, N> {
/// The sequence number: there is exactly one batch in each epoch.
pub(super) seqnum: u64,
pub(super) epoch: u64,
/// The current `DynamicHoneyBadger` era.
pub(super) era: u64,
/// The user contributions committed in this epoch.
@ -23,25 +23,10 @@ pub struct Batch<C, N> {
pub(super) encryption_schedule: EncryptionSchedule,
}
impl<C, N: NodeIdT> Epoched for Batch<C, N> {
type Epoch = Epoch;
/// Returns the **next** `DynamicHoneyBadger` epoch after the sequential epoch of the batch.
fn epoch(&self) -> Epoch {
let seqnum = self.seqnum;
let era = self.era;
if self.change == ChangeState::None {
Epoch(era, Some(seqnum - era + 1))
} else {
Epoch(seqnum + 1, Some(0))
}
}
}
impl<C, N: NodeIdT> Batch<C, N> {
/// Returns the linear epoch of this `DynamicHoneyBadger` batch.
pub fn seqnum(&self) -> u64 {
self.seqnum
pub fn epoch(&self) -> u64 {
self.epoch
}
/// Returns the `DynamicHoneyBadger` era of the batch.
@ -112,7 +97,7 @@ impl<C, N: NodeIdT> Batch<C, N> {
return None;
}
Some(JoinPlan {
era: self.seqnum + 1,
era: self.epoch + 1,
change: self.change.clone(),
pub_key_set: self.netinfo.public_key_set().clone(),
pub_keys: self.netinfo.public_key_map().clone(),
@ -126,7 +111,7 @@ impl<C, N: NodeIdT> Batch<C, N> {
where
C: PartialEq,
{
self.seqnum == other.seqnum
self.epoch == other.epoch
&& self.era == other.era
&& self.contributions == other.contributions
&& self.change == other.change

View File

@ -246,7 +246,7 @@ where
let output = step.extend_with(hb_step, |hb_msg| Message::HoneyBadger(self.era, hb_msg));
for hb_batch in output {
let batch_era = self.era;
let batch_seqnum = hb_batch.epoch + batch_era;
let batch_epoch = hb_batch.epoch + batch_era;
let mut batch_contributions = BTreeMap::new();
// Add the user transactions to `batch` and handle votes and DKG messages.
@ -280,14 +280,14 @@ where
// If DKG completed, apply the change, restart Honey Badger, and inform the user.
debug!("{}: DKG for {:?} complete!", self, kgs.change);
self.netinfo = kgs.key_gen.into_network_info()?;
self.restart_honey_badger(batch_seqnum + 1, None);
self.restart_honey_badger(batch_epoch + 1, None);
ChangeState::Complete(Change::NodeChange(kgs.change))
} else if let Some(change) = self.vote_counter.compute_winner().cloned() {
// If there is a new change, restart DKG. Inform the user about the current change.
step.extend(match &change {
Change::NodeChange(change) => self.update_key_gen(batch_seqnum + 1, &change)?,
Change::NodeChange(change) => self.update_key_gen(batch_epoch + 1, &change)?,
Change::EncryptionSchedule(schedule) => {
self.update_encryption_schedule(batch_seqnum + 1, *schedule)?
self.update_encryption_schedule(batch_epoch + 1, *schedule)?
}
});
match change {
@ -298,7 +298,7 @@ where
ChangeState::None
};
step.output.push(Batch {
seqnum: batch_seqnum,
epoch: batch_epoch,
era: batch_era,
change,
netinfo: Arc::new(self.netinfo.clone()),

View File

@ -24,8 +24,14 @@ where
}
}
fn convert_epoch(&self) -> Epoch {
self.epoch()
fn next_epoch(&self) -> Epoch {
let epoch = self.epoch;
let era = self.era;
if self.change == ChangeState::None {
Epoch(era, Some(epoch - era + 1))
} else {
Epoch(epoch + 1, Some(0))
}
}
}
@ -42,6 +48,7 @@ where
(Some(us), Some(them)) => them <= us && us <= them + max_future_epochs,
(None, Some(_)) => true,
(_, None) => {
// TODO: return a Fault.
error!("Peer's Honey Badger epoch undefined");
false
}

View File

@ -1,6 +1,6 @@
use std::collections::BTreeMap;
use {Epoched, NodeIdT};
use NodeIdT;
/// A batch of contributions the algorithm has output.
#[derive(Clone, Debug)]
@ -9,15 +9,6 @@ pub struct Batch<C, N> {
pub contributions: BTreeMap<N, C>,
}
impl<C, N: NodeIdT> Epoched for Batch<C, N> {
type Epoch = u64;
/// Returns the **next** `HoneyBadger` epoch after the sequential epoch of the batch.
fn epoch(&self) -> u64 {
self.epoch + 1
}
}
impl<C, N: NodeIdT> Batch<C, N> {
/// Returns an iterator over references to all transactions included in the batch.
pub fn iter<'a>(&'a self) -> impl Iterator<Item = <&'a C as IntoIterator>::Item>

View File

@ -17,8 +17,8 @@ where
None
}
fn convert_epoch(&self) -> u64 {
self.epoch()
fn next_epoch(&self) -> u64 {
self.epoch + 1
}
}

View File

@ -25,7 +25,7 @@ pub trait SenderQueueableMessage: Epoched {
fn is_obsolete(&self, them: <Self as Epoched>::Epoch) -> bool;
}
pub trait SenderQueueableOutput<N, M>: Epoched
pub trait SenderQueueableOutput<N, M>
where
N: NodeIdT,
M: Epoched,
@ -34,8 +34,8 @@ where
/// all nodes.
fn added_node(&self) -> Option<N>;
/// Performs type conversion of the batch epoch info a fixed epoch type.
fn convert_epoch(&self) -> <M as Epoched>::Epoch;
/// Computes the next epoch after the `DynamicHoneyBadger` epoch of the batch.
fn next_epoch(&self) -> <M as Epoched>::Epoch;
}
pub trait SenderQueueableEpoch
@ -250,7 +250,7 @@ where
fn update_epoch(&mut self, step: &::Step<D>) -> Step<D> {
// Look up `DynamicHoneyBadger` epoch updates and collect any added peers.
let new_epoch = step.output.iter().fold(self.epoch, |epoch, batch| {
let max_epoch = epoch.max(batch.convert_epoch());
let max_epoch = epoch.max(batch.next_epoch());
if let Some(node) = batch.added_node() {
if &node != self.our_id() {
self.peer_epochs

View File

@ -1,6 +1,6 @@
//! Common supertraits for distributed algorithms.
use std::collections::{BTreeMap, BTreeSet};
use std::collections::BTreeMap;
use std::fmt::{Debug, Display};
use std::hash::Hash;
use std::iter::once;
@ -223,7 +223,6 @@ where
<D as DistAlgorithm>::NodeId: NodeIdT + Rand,
<D as DistAlgorithm>::Message:
'i + Clone + SenderQueueableMessage + Serialize + DeserializeOwned,
<D as DistAlgorithm>::Output: Epoched,
{
/// Removes and returns any messages that are not yet accepted by remote nodes according to the
/// mapping `remote_epochs`. This way the returned messages are postponed until later, and the
@ -237,59 +236,46 @@ where
<D as DistAlgorithm>::NodeId: 'i,
{
let messages = &mut self.messages;
let (mut passed_msgs, failed_msgs): (Vec<_>, Vec<_>) =
messages
.drain(..)
.partition(|TargetedMessage { target, message }| match target {
let pass =
|TargetedMessage { target, message }: &TargetedMessage<D::Message, D::NodeId>| {
match target {
Target::All => peer_epochs
.values()
.all(|&them| message.is_accepted(them, max_future_epochs)),
Target::Node(id) => peer_epochs
.get(&id)
.map_or(false, |&them| message.is_accepted(them, max_future_epochs)),
});
}
};
// `Target::All` messages contained in the result of the partitioning are analyzed further
// and each split into two sets of point messages: those which can be sent without delay and
// those which should be postponed.
let remote_nodes: BTreeSet<&D::NodeId> = peer_epochs.keys().collect();
let mut deferred_msgs: Vec<(D::NodeId, D::Message)> = Vec::new();
for msg in failed_msgs {
let m = msg.message;
match msg.target {
Target::Node(id) => {
let defer = {
let lagging = |&them| {
!(m.is_accepted(them, max_future_epochs) || m.is_obsolete(them))
let mut passed_msgs: Vec<_> = Vec::new();
for msg in messages.drain(..) {
if pass(&msg) {
passed_msgs.push(msg);
} else {
let m = msg.message;
match msg.target {
Target::Node(ref id) => {
let defer = {
let lagging = |&them| {
!(m.is_accepted(them, max_future_epochs) || m.is_obsolete(them))
};
peer_epochs.get(&id).map_or(true, lagging)
};
peer_epochs.get(&id).map_or(true, lagging)
};
if defer {
deferred_msgs.push((id, m));
if defer {
deferred_msgs.push((id.clone(), m));
}
}
}
Target::All => {
let isnt_earlier_epoch =
|&them| m.is_accepted(them, max_future_epochs) || m.is_obsolete(them);
let lagging = |them| !isnt_earlier_epoch(them);
let accepts = |&them| m.is_accepted(them, max_future_epochs);
let accepting_nodes: BTreeSet<&D::NodeId> = peer_epochs
.iter()
.filter(|(_, them)| accepts(them))
.map(|(id, _)| id)
.collect();
let non_lagging_nodes: BTreeSet<&D::NodeId> = peer_epochs
.iter()
.filter(|(_, them)| isnt_earlier_epoch(them))
.map(|(id, _)| id)
.collect();
for &id in &accepting_nodes {
passed_msgs.push(Target::Node(id.clone()).message(m.clone()));
}
let lagging_nodes: BTreeSet<_> =
remote_nodes.difference(&non_lagging_nodes).collect();
for &id in lagging_nodes {
if peer_epochs.get(id).map_or(true, lagging) {
deferred_msgs.push((id.clone(), m.clone()));
Target::All => {
for (id, &them) in peer_epochs {
if m.is_accepted(them, max_future_epochs) {
passed_msgs.push(Target::Node(id.clone()).message(m.clone()));
} else if !m.is_obsolete(them) {
deferred_msgs.push((id.clone(), m.clone()));
}
}
}
}

View File

@ -11,7 +11,6 @@ use std::{collections, time};
use hbbft::dynamic_honey_badger::{Change, ChangeState, DynamicHoneyBadger, Input, NodeChange};
use hbbft::sender_queue::SenderQueue;
use hbbft::Epoched;
use net::adversary::ReorderingAdversary;
use net::proptest::{gen_seed, NetworkDimension, TestRng, TestRngSeed};
use net::{NetBuilder, NewNodeInfo};