sender queue implementation

This commit is contained in:
Vladimir Komendantskiy 2018-10-25 16:07:52 +01:00
parent 3c915cd4ff
commit ee46dd4b81
23 changed files with 966 additions and 167 deletions

View File

@ -26,6 +26,7 @@ use signifix::{metric, TryFrom};
use hbbft::dynamic_honey_badger::DynamicHoneyBadger;
use hbbft::queueing_honey_badger::{Batch, QueueingHoneyBadger};
use hbbft::sender_queue::{Message, SenderQueue};
use hbbft::{DistAlgorithm, NetworkInfo, Step, Target};
const VERSION: &str = env!("CARGO_PKG_VERSION");
@ -347,6 +348,8 @@ struct EpochInfo {
nodes: BTreeMap<NodeId, (Duration, Batch<Transaction, NodeId>)>,
}
type QHB = SenderQueue<QueueingHoneyBadger<Transaction, NodeId, Vec<Transaction>>>;
impl EpochInfo {
/// Adds a batch to this epoch. Prints information if the epoch is complete.
fn add(
@ -354,7 +357,7 @@ impl EpochInfo {
id: NodeId,
time: Duration,
batch: &Batch<Transaction, NodeId>,
network: &TestNetwork<QueueingHoneyBadger<Transaction, NodeId, Vec<Transaction>>>,
network: &TestNetwork<QHB>,
) {
if self.nodes.contains_key(&id) {
return;
@ -373,7 +376,7 @@ impl EpochInfo {
let txs = batch.iter().unique().count();
println!(
"{:>5} {:6} {:6} {:5} {:9} {:>9}B",
batch.epoch().to_string().cyan(),
batch.seqnum().to_string().cyan(),
min_t.as_secs() * 1000 + u64::from(max_t.subsec_nanos()) / 1_000_000,
max_t.as_secs() * 1000 + u64::from(max_t.subsec_nanos()) / 1_000_000,
txs,
@ -385,9 +388,7 @@ impl EpochInfo {
}
/// Proposes `num_txs` values and expects nodes to output and order them.
fn simulate_honey_badger(
mut network: TestNetwork<QueueingHoneyBadger<Transaction, NodeId, Vec<Transaction>>>,
) {
fn simulate_honey_badger(mut network: TestNetwork<QHB>) {
// Handle messages until all nodes have output all transactions.
println!(
"{}",
@ -396,7 +397,7 @@ fn simulate_honey_badger(
let mut epochs = Vec::new();
while let Some(id) = network.step() {
for &(time, ref batch) in &network.nodes[&id].outputs {
let epoch = batch.epoch() as usize;
let epoch = batch.seqnum() as usize;
if epochs.len() <= epoch {
epochs.resize(epoch + 1, EpochInfo::default());
}
@ -436,11 +437,16 @@ fn main() {
.map(|_| Transaction::new(args.flag_tx_size))
.collect();
let new_honey_badger = |netinfo: NetworkInfo<NodeId>| {
let dyn_hb = DynamicHoneyBadger::builder().build(netinfo);
QueueingHoneyBadger::builder(dyn_hb)
let dhb = DynamicHoneyBadger::builder().build(netinfo.clone());
let (qhb, qhb_step) = QueueingHoneyBadger::builder(dhb)
.batch_size(args.flag_b)
.build_with_transactions(txs.clone(), rand::thread_rng().gen::<Isaac64Rng>())
.expect("instantiate QueueingHoneyBadger")
.expect("instantiate QueueingHoneyBadger");
let our_id = *netinfo.our_id();
let peer_ids = netinfo.all_ids().filter(|&&them| them != our_id).cloned();
let (sq, mut step) = SenderQueue::builder(qhb, peer_ids).build(our_id);
step.extend_with(qhb_step, Message::from);
(sq, step)
};
let hw_quality = HwQuality {
latency: Duration::from_millis(args.flag_lag),

View File

@ -2,14 +2,16 @@ use std::collections::BTreeMap;
use std::sync::Arc;
use super::EncryptionSchedule;
use super::{ChangeState, JoinPlan};
use {NetworkInfo, NodeIdT};
use super::{ChangeState, Epoch, JoinPlan};
use {Epoched, NetworkInfo, NodeIdT};
/// A batch of transactions the algorithm has output.
#[derive(Clone, Debug)]
pub struct Batch<C, N> {
/// The sequence number: there is exactly one batch in each epoch.
pub(super) epoch: u64,
pub(super) seqnum: u64,
/// The current `DynamicHoneyBadger` era.
pub(super) era: u64,
/// The user contributions committed in this epoch.
pub(super) contributions: BTreeMap<N, C>,
/// The current state of adding or removing a node: whether any is in progress, or completed
@ -21,9 +23,30 @@ pub struct Batch<C, N> {
pub(super) encryption_schedule: EncryptionSchedule,
}
impl<C, N: NodeIdT> Epoched for Batch<C, N> {
type Epoch = Epoch;
/// Returns the **next** `DynamicHoneyBadger` epoch after the sequential epoch of the batch.
fn epoch(&self) -> Epoch {
let seqnum = self.seqnum;
let era = self.era;
if self.change == ChangeState::None {
Epoch(era, Some(seqnum - era + 1))
} else {
Epoch(seqnum + 1, Some(0))
}
}
}
impl<C, N: NodeIdT> Batch<C, N> {
pub fn epoch(&self) -> u64 {
self.epoch
/// Returns the linear epoch of this `DynamicHoneyBadger` batch.
pub fn seqnum(&self) -> u64 {
self.seqnum
}
/// Returns the `DynamicHoneyBadger` era of the batch.
pub fn era(&self) -> u64 {
self.era
}
/// Returns whether any change to the set of participating nodes is in progress or was
@ -89,7 +112,7 @@ impl<C, N: NodeIdT> Batch<C, N> {
return None;
}
Some(JoinPlan {
epoch: self.epoch + 1,
era: self.seqnum + 1,
change: self.change.clone(),
pub_key_set: self.netinfo.public_key_set().clone(),
pub_keys: self.netinfo.public_key_map().clone(),
@ -103,7 +126,8 @@ impl<C, N: NodeIdT> Batch<C, N> {
where
C: PartialEq,
{
self.epoch == other.epoch
self.seqnum == other.seqnum
&& self.era == other.era
&& self.contributions == other.contributions
&& self.change == other.change
&& self.netinfo.public_key_set() == other.netinfo.public_key_set()

View File

@ -16,10 +16,10 @@ use {Contribution, NetworkInfo, NodeIdT};
/// A Dynamic Honey Badger builder, to configure the parameters and create new instances of
/// `DynamicHoneyBadger`.
pub struct DynamicHoneyBadgerBuilder<C, N> {
/// Start in this epoch.
epoch: u64,
/// Start in this era.
era: u64,
/// The maximum number of future epochs for which we handle messages simultaneously.
max_future_epochs: usize,
max_future_epochs: u64,
/// Random number generator passed on to algorithm instance for key generation. Also used to
/// instantiate `HoneyBadger`.
rng: Box<dyn rand::Rng>,
@ -30,11 +30,14 @@ pub struct DynamicHoneyBadgerBuilder<C, N> {
_phantom: PhantomData<(C, N)>,
}
impl<C, N> Default for DynamicHoneyBadgerBuilder<C, N> {
impl<C, N> Default for DynamicHoneyBadgerBuilder<C, N>
where
N: Ord,
{
fn default() -> Self {
// TODO: Use the defaults from `HoneyBadgerBuilder`.
DynamicHoneyBadgerBuilder {
epoch: 0,
era: 0,
max_future_epochs: 3,
rng: Box::new(rand::thread_rng()),
subset_handling_strategy: SubsetHandlingStrategy::Incremental,
@ -55,14 +58,14 @@ where
Self::default()
}
/// Sets the starting epoch to the given value.
pub fn epoch(&mut self, epoch: u64) -> &mut Self {
self.epoch = epoch;
/// Sets the starting era to the given value.
pub fn era(&mut self, era: u64) -> &mut Self {
self.era = era;
self
}
/// Sets the maximum number of future epochs for which we handle messages simultaneously.
pub fn max_future_epochs(&mut self, max_future_epochs: usize) -> &mut Self {
pub fn max_future_epochs(&mut self, max_future_epochs: u64) -> &mut Self {
self.max_future_epochs = max_future_epochs;
self
}
@ -91,18 +94,18 @@ where
/// Creates a new Dynamic Honey Badger instance with an empty buffer.
pub fn build(&mut self, netinfo: NetworkInfo<N>) -> DynamicHoneyBadger<C, N> {
let DynamicHoneyBadgerBuilder {
epoch,
era,
max_future_epochs,
rng,
subset_handling_strategy,
encryption_schedule,
_phantom,
} = self;
let epoch = *epoch;
let era = *era;
let max_future_epochs = *max_future_epochs;
let arc_netinfo = Arc::new(netinfo.clone());
let honey_badger = HoneyBadger::builder(arc_netinfo.clone())
.session_id(epoch)
.session_id(era)
.max_future_epochs(max_future_epochs)
.rng(rng.sub_rng())
.subset_handling_strategy(subset_handling_strategy.clone())
@ -111,12 +114,11 @@ where
DynamicHoneyBadger {
netinfo,
max_future_epochs,
start_epoch: epoch,
era,
vote_counter: VoteCounter::new(arc_netinfo, 0),
key_gen_msg_buffer: Vec::new(),
honey_badger,
key_gen_state: None,
incoming_queue: Vec::new(),
rng: Box::new(rng.sub_rng()),
}
}
@ -155,17 +157,16 @@ where
let mut dhb = DynamicHoneyBadger {
netinfo,
max_future_epochs: self.max_future_epochs,
start_epoch: join_plan.epoch,
vote_counter: VoteCounter::new(arc_netinfo, join_plan.epoch),
era: join_plan.era,
vote_counter: VoteCounter::new(arc_netinfo, join_plan.era),
key_gen_msg_buffer: Vec::new(),
honey_badger,
key_gen_state: None,
incoming_queue: Vec::new(),
rng: Box::new(self.rng.sub_rng()),
};
let step = match join_plan.change {
ChangeState::InProgress(ref change) => match change {
Change::NodeChange(change) => dhb.update_key_gen(join_plan.epoch, change)?,
Change::NodeChange(change) => dhb.update_key_gen(join_plan.era, change)?,
_ => Step::default(),
},
ChangeState::None | ChangeState::Complete(..) => Step::default(),

View File

@ -1,6 +1,6 @@
use std::collections::BTreeMap;
use std::sync::Arc;
use std::{fmt, mem, result};
use std::{fmt, result};
use bincode;
use crypto::Signature;
@ -30,9 +30,9 @@ pub struct DynamicHoneyBadger<C, N: Rand> {
/// Shared network data.
pub(super) netinfo: NetworkInfo<N>,
/// The maximum number of future epochs for which we handle messages simultaneously.
pub(super) max_future_epochs: usize,
pub(super) max_future_epochs: u64,
/// The first epoch after the latest node change.
pub(super) start_epoch: u64,
pub(super) era: u64,
/// The buffer and counter for the pending and committed change votes.
pub(super) vote_counter: VoteCounter<N>,
/// Pending node transactions that we will propose in the next epoch.
@ -41,8 +41,6 @@ pub struct DynamicHoneyBadger<C, N: Rand> {
pub(super) honey_badger: HoneyBadger<InternalContrib<C, N>, N>,
/// The current key generation process, and the change it applies to.
pub(super) key_gen_state: Option<KeyGenState<N>>,
/// A queue for messages from future epochs that cannot be handled yet.
pub(super) incoming_queue: Vec<(N, Message<N>)>,
/// A random number generator used for secret key generation.
// Boxed to avoid overloading the algorithm's type with more generics.
#[derivative(Debug(format_with = "util::fmt_rng"))]
@ -107,7 +105,7 @@ where
let key_gen_messages = self
.key_gen_msg_buffer
.iter()
.filter(|kg_msg| kg_msg.epoch() == self.start_epoch)
.filter(|kg_msg| kg_msg.era() == self.era)
.cloned()
.collect();
let step = self
@ -137,16 +135,7 @@ where
///
/// This must be called with every message we receive from another node.
pub fn handle_message(&mut self, sender_id: &N, message: Message<N>) -> Result<Step<C, N>> {
let epoch = message.start_epoch();
if epoch < self.start_epoch {
// Obsolete message.
Ok(Step::default())
} else if epoch > self.start_epoch {
// Message cannot be handled yet. Save it for later.
let entry = (sender_id.clone(), message);
self.incoming_queue.push(entry);
Ok(Step::default())
} else {
if message.era() == self.era {
match message {
Message::HoneyBadger(_, hb_msg) => {
self.handle_honey_badger_message(sender_id, hb_msg)
@ -159,6 +148,11 @@ where
.add_pending_vote(sender_id, signed_vote)
.map(FaultLog::into),
}
} else if message.era() > self.era {
Ok(Fault::new(sender_id.clone(), FaultKind::UnexpectedDhbMessageEra).into())
} else {
// The message is late; discard it.
Ok(Step::default())
}
}
@ -238,7 +232,7 @@ where
return Ok(Fault::new(sender_id.clone(), fault_kind).into());
}
let tx = SignedKeyGenMsg(self.start_epoch, sender_id.clone(), kg_msg, sig);
let tx = SignedKeyGenMsg(self.era, sender_id.clone(), kg_msg, sig);
self.key_gen_msg_buffer.push(tx);
Ok(FaultLog::default())
}
@ -249,10 +243,10 @@ where
hb_step: honey_badger::Step<InternalContrib<C, N>, N>,
) -> Result<Step<C, N>> {
let mut step: Step<C, N> = Step::default();
let start_epoch = self.start_epoch;
let output = step.extend_with(hb_step, |hb_msg| Message::HoneyBadger(start_epoch, hb_msg));
let output = step.extend_with(hb_step, |hb_msg| Message::HoneyBadger(self.era, hb_msg));
for hb_batch in output {
let batch_epoch = hb_batch.epoch + self.start_epoch;
let batch_era = self.era;
let batch_seqnum = hb_batch.epoch + batch_era;
let mut batch_contributions = BTreeMap::new();
// Add the user transactions to `batch` and handle votes and DKG messages.
@ -267,9 +261,9 @@ where
batch_contributions.insert(id.clone(), contrib);
self.key_gen_msg_buffer
.retain(|skgm| !key_gen_messages.contains(skgm));
for SignedKeyGenMsg(epoch, s_id, kg_msg, sig) in key_gen_messages {
if epoch != self.start_epoch {
let fault_kind = FaultKind::InvalidKeyGenMessageEpoch;
for SignedKeyGenMsg(era, s_id, kg_msg, sig) in key_gen_messages {
if era != self.era {
let fault_kind = FaultKind::InvalidKeyGenMessageEra;
step.fault_log.append(id.clone(), fault_kind);
} else if !self.verify_signature(&s_id, &sig, &kg_msg)? {
let fault_kind = FaultKind::InvalidKeyGenMessageSignature;
@ -282,19 +276,18 @@ where
}
}
}
let change = if let Some(kgs) = self.take_ready_key_gen() {
// If DKG completed, apply the change, restart Honey Badger, and inform the user.
debug!("{}: DKG for {:?} complete!", self, kgs.change);
self.netinfo = kgs.key_gen.into_network_info()?;
self.restart_honey_badger(batch_epoch + 1, None);
self.restart_honey_badger(batch_seqnum + 1, None);
ChangeState::Complete(Change::NodeChange(kgs.change))
} else if let Some(change) = self.vote_counter.compute_winner().cloned() {
// If there is a new change, restart DKG. Inform the user about the current change.
step.extend(match &change {
Change::NodeChange(change) => self.update_key_gen(batch_epoch + 1, &change)?,
Change::NodeChange(change) => self.update_key_gen(batch_seqnum + 1, &change)?,
Change::EncryptionSchedule(schedule) => {
self.update_encryption_schedule(batch_epoch + 1, *schedule)?
self.update_encryption_schedule(batch_seqnum + 1, *schedule)?
}
});
match change {
@ -305,29 +298,23 @@ where
ChangeState::None
};
step.output.push(Batch {
epoch: batch_epoch,
seqnum: batch_seqnum,
era: batch_era,
change,
netinfo: Arc::new(self.netinfo.clone()),
contributions: batch_contributions,
encryption_schedule: self.honey_badger.get_encryption_schedule(),
});
}
// If `start_epoch` changed, we can now handle some queued messages.
if start_epoch < self.start_epoch {
let queue = mem::replace(&mut self.incoming_queue, Vec::new());
for (sender_id, msg) in queue {
step.extend(self.handle_message(&sender_id, msg)?);
}
}
Ok(step)
}
pub(super) fn update_encryption_schedule(
&mut self,
epoch: u64,
era: u64,
encryption_schedule: EncryptionSchedule,
) -> Result<Step<C, N>> {
self.restart_honey_badger(epoch, Some(encryption_schedule));
self.restart_honey_badger(era, Some(encryption_schedule));
Ok(Step::default())
}
@ -335,7 +322,7 @@ where
/// by the current change.
pub(super) fn update_key_gen(
&mut self,
epoch: u64,
era: u64,
change: &NodeChange<N>,
) -> Result<Step<C, N>> {
if self.key_gen_state.as_ref().map(|kgs| &kgs.change) == Some(change) {
@ -350,7 +337,7 @@ where
} {
warn!("{}: No-op change: {:?}", self, change);
}
self.restart_honey_badger(epoch, None);
self.restart_honey_badger(era, None);
// TODO: This needs to be the same as `num_faulty` will be in the _new_
// `NetworkInfo` if the change goes through. It would be safer to deduplicate.
let threshold = (pub_keys.len() - 1) / 3;
@ -366,17 +353,13 @@ where
}
/// Starts a new `HoneyBadger` instance and resets the vote counter.
fn restart_honey_badger(
&mut self,
epoch: u64,
encryption_schedule: Option<EncryptionSchedule>,
) {
self.start_epoch = epoch;
self.key_gen_msg_buffer.retain(|kg_msg| kg_msg.0 >= epoch);
fn restart_honey_badger(&mut self, era: u64, encryption_schedule: Option<EncryptionSchedule>) {
self.era = era;
self.key_gen_msg_buffer.retain(|kg_msg| kg_msg.0 >= era);
let netinfo = Arc::new(self.netinfo.clone());
self.vote_counter = VoteCounter::new(netinfo.clone(), epoch);
self.vote_counter = VoteCounter::new(netinfo.clone(), era);
self.honey_badger = HoneyBadger::builder(netinfo)
.session_id(epoch)
.session_id(era)
.max_future_epochs(self.max_future_epochs)
.rng(self.rng.sub_rng())
.encryption_schedule(
@ -434,11 +417,10 @@ where
let sig = Box::new(self.netinfo.secret_key().sign(ser));
if self.netinfo.is_validator() {
let our_id = self.our_id().clone();
let signed_msg =
SignedKeyGenMsg(self.start_epoch, our_id, kg_msg.clone(), *sig.clone());
let signed_msg = SignedKeyGenMsg(self.era, our_id, kg_msg.clone(), *sig.clone());
self.key_gen_msg_buffer.push(signed_msg);
}
let msg = Message::KeyGen(self.start_epoch, kg_msg, sig);
let msg = Message::KeyGen(self.era, kg_msg, sig);
Ok(Target::All.message(msg).into())
}
@ -479,6 +461,11 @@ where
let pk_opt = self.netinfo.public_key(node_id).or_else(get_candidate_key);
Ok(pk_opt.map_or(false, |pk| pk.verify(&sig, ser)))
}
/// Returns the maximum future epochs of the Honey Badger algorithm instance.
pub fn max_future_epochs(&self) -> u64 {
self.max_future_epochs
}
}
impl<C, N> fmt::Display for DynamicHoneyBadger<C, N>
@ -487,6 +474,6 @@ where
N: NodeIdT + Serialize + DeserializeOwned + Rand,
{
fn fmt(&self, f: &mut fmt::Formatter) -> result::Result<(), fmt::Error> {
write!(f, "{:?} DHB(era: {})", self.our_id(), self.start_epoch)
write!(f, "{:?} DHB(era: {})", self.our_id(), self.era)
}
}

View File

@ -62,16 +62,20 @@ mod dynamic_honey_badger;
mod error;
mod votes;
pub mod sender_queueable;
use std::cmp::Ordering;
use std::collections::BTreeMap;
use crypto::{PublicKey, PublicKeySet, Signature};
use rand::Rand;
use serde_derive::{Deserialize, Serialize};
use std::collections::BTreeMap;
use self::votes::{SignedVote, VoteCounter};
use super::threshold_decryption::EncryptionSchedule;
use honey_badger::Message as HbMessage;
use sync_key_gen::{Ack, Part, SyncKeyGen};
use NodeIdT;
use {Epoched, NodeIdT};
pub use self::batch::Batch;
pub use self::builder::DynamicHoneyBadgerBuilder;
@ -113,19 +117,54 @@ pub enum Message<N: Rand> {
}
impl<N: Rand> Message<N> {
fn start_epoch(&self) -> u64 {
fn era(&self) -> u64 {
match *self {
Message::HoneyBadger(epoch, _) => epoch,
Message::KeyGen(epoch, _, _) => epoch,
Message::HoneyBadger(era, _) => era,
Message::KeyGen(era, _, _) => era,
Message::SignedVote(ref signed_vote) => signed_vote.era(),
}
}
}
pub fn epoch(&self) -> u64 {
/// Dynamic Honey Badger epoch. It consists of an era and an epoch of Honey Badger that started in
/// that era. For messages originating from `DynamicHoneyBadger` as opposed to `HoneyBadger`, that
/// HoneyBadger epoch is `None`.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, Serialize, Deserialize)]
pub struct Epoch(pub(super) u64, pub(super) Option<u64>);
impl PartialOrd for Epoch {
/// Partial ordering on epochs. For any `era` and `hb_epoch`, two epochs `Epoch(era, None)` and `Epoch(era,
/// Some(hb_epoch))` are incomparable.
fn partial_cmp(&self, other: &Epoch) -> Option<Ordering> {
let (&Epoch(a, b), &Epoch(c, d)) = (self, other);
if a < c {
Some(Ordering::Less)
} else if a > c {
Some(Ordering::Greater)
} else if b.is_none() && d.is_none() {
Some(Ordering::Equal)
} else if let (Some(b), Some(d)) = (b, d) {
Some(Ord::cmp(&b, &d))
} else {
None
}
}
}
impl Default for Epoch {
fn default() -> Epoch {
Epoch(0, Some(0))
}
}
impl<N: Rand> Epoched for Message<N> {
type Epoch = Epoch;
fn epoch(&self) -> Epoch {
match *self {
Message::HoneyBadger(start_epoch, ref msg) => start_epoch + msg.epoch(),
Message::KeyGen(epoch, _, _) => epoch,
Message::SignedVote(ref signed_vote) => signed_vote.era(),
Message::HoneyBadger(era, ref msg) => Epoch(era, Some(msg.epoch())),
Message::KeyGen(era, _, _) => Epoch(era, None),
Message::SignedVote(ref signed_vote) => Epoch(signed_vote.era(), None),
}
}
}
@ -136,7 +175,7 @@ impl<N: Rand> Message<N> {
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct JoinPlan<N: Ord> {
/// The first epoch the new node will observe.
epoch: u64,
era: u64,
/// The current change. If `InProgress`, key generation for it is beginning at `epoch`.
change: ChangeState<N>,
/// The current public key set for threshold cryptography.
@ -208,8 +247,8 @@ struct InternalContrib<C, N> {
struct SignedKeyGenMsg<N>(u64, N, KeyGenMessage, Signature);
impl<N> SignedKeyGenMsg<N> {
/// Returns the start epoch of the ongoing key generation.
fn epoch(&self) -> u64 {
/// Returns the era of the ongoing key generation.
fn era(&self) -> u64 {
self.0
}
}

View File

@ -0,0 +1,75 @@
use log::error;
use rand::Rand;
use serde::{de::DeserializeOwned, Serialize};
use super::{Batch, Change, ChangeState, DynamicHoneyBadger, Epoch, Message, NodeChange};
use sender_queue::{
SenderQueueableDistAlgorithm, SenderQueueableEpoch, SenderQueueableMessage,
SenderQueueableOutput,
};
use {Contribution, Epoched, NodeIdT};
impl<C, N> SenderQueueableOutput<N, Message<N>> for Batch<C, N>
where
C: Contribution,
N: NodeIdT + Rand,
{
fn added_node(&self) -> Option<N> {
if let ChangeState::InProgress(Change::NodeChange(NodeChange::Add(ref id, _))) = self.change
{
// Register the new node to send broadcast messages to it from now on.
Some(id.clone())
} else {
None
}
}
fn convert_epoch(&self) -> Epoch {
self.epoch()
}
}
impl<N> SenderQueueableMessage for Message<N>
where
N: Rand,
{
fn is_accepted(&self, Epoch(them_era, them_hb_epoch): Epoch, max_future_epochs: u64) -> bool {
let Epoch(era, hb_epoch) = self.epoch();
if era != them_era {
return false;
}
match (hb_epoch, them_hb_epoch) {
(Some(us), Some(them)) => them <= us && us <= them + max_future_epochs,
(None, Some(_)) => true,
(_, None) => {
error!("Peer's Honey Badger epoch undefined");
false
}
}
}
fn is_obsolete(&self, Epoch(them_era, them_hb_epoch): Epoch) -> bool {
let Epoch(era, hb_epoch) = self.epoch();
era < them_era || (era == them_era && hb_epoch.is_some() && hb_epoch < them_hb_epoch)
}
}
impl SenderQueueableEpoch for Epoch {
fn spanning_epochs(&self) -> Vec<Self> {
if let Epoch(era, Some(_)) = *self {
vec![Epoch(era, None)]
} else {
vec![]
}
}
}
impl<C, N> SenderQueueableDistAlgorithm for DynamicHoneyBadger<C, N>
where
C: Contribution + Serialize + DeserializeOwned,
N: NodeIdT + Serialize + DeserializeOwned + Rand,
{
fn max_future_epochs(&self) -> u64 {
self.max_future_epochs()
}
}

View File

@ -20,6 +20,8 @@ pub enum FaultKind {
DeserializeCiphertext,
/// `HoneyBadger` received an invalid ciphertext from the proposer.
InvalidCiphertext,
/// `HoneyBadger` received a message with an invalid epoch.
UnexpectedHbMessageEpoch,
/// `ThresholdDecryption` received multiple shares from the same sender.
MultipleDecryptionShares,
/// `Broadcast` received a `Value` from a node other than the proposer.
@ -39,8 +41,8 @@ pub enum FaultKind {
BatchDeserializationFailed,
/// `DynamicHoneyBadger` received a key generation message with an invalid signature.
InvalidKeyGenMessageSignature,
/// `DynamicHoneyBadger` received a key generation message with an invalid epoch.
InvalidKeyGenMessageEpoch,
/// `DynamicHoneyBadger` received a key generation message with an invalid era.
InvalidKeyGenMessageEra,
/// `DynamicHoneyBadger` received a key generation message when there was no key generation in
/// progress.
UnexpectedKeyGenMessage,
@ -61,6 +63,8 @@ pub enum FaultKind {
InvalidVoteSignature,
/// A validator committed an invalid vote in `DynamicHoneyBadger`.
InvalidCommittedVote,
/// `DynamicHoneyBadger` received a message with an invalid era.
UnexpectedDhbMessageEra,
/// `BinaryAgreement` received a duplicate `BVal` message.
DuplicateBVal,
/// `BinaryAgreement` received a duplicate `Aux` message.

View File

@ -1,6 +1,6 @@
use std::collections::BTreeMap;
use NodeIdT;
use {Epoched, NodeIdT};
/// A batch of contributions the algorithm has output.
#[derive(Clone, Debug)]
@ -9,6 +9,15 @@ pub struct Batch<C, N> {
pub contributions: BTreeMap<N, C>,
}
impl<C, N: NodeIdT> Epoched for Batch<C, N> {
type Epoch = u64;
/// Returns the **next** `HoneyBadger` epoch after the sequential epoch of the batch.
fn epoch(&self) -> u64 {
self.epoch + 1
}
}
impl<C, N: NodeIdT> Batch<C, N> {
/// Returns an iterator over references to all transactions included in the batch.
pub fn iter<'a>(&'a self) -> impl Iterator<Item = <&'a C as IntoIterator>::Item>

View File

@ -21,7 +21,7 @@ pub struct HoneyBadgerBuilder<C, N> {
/// Start in this epoch.
epoch: u64,
/// The maximum number of future epochs for which we handle messages simultaneously.
max_future_epochs: usize,
max_future_epochs: u64,
/// Random number generator passed on to algorithm instance for signing and encrypting.
rng: Box<dyn Rng>,
/// Strategy used to handle the output of the `Subset` algorithm.
@ -73,7 +73,7 @@ where
}
/// Sets the maximum number of future epochs for which we handle messages simultaneously.
pub fn max_future_epochs(&mut self, max_future_epochs: usize) -> &mut Self {
pub fn max_future_epochs(&mut self, max_future_epochs: u64) -> &mut Self {
self.max_future_epochs = max_future_epochs;
self
}
@ -102,7 +102,6 @@ where
has_input: false,
epochs: BTreeMap::new(),
max_future_epochs: self.max_future_epochs as u64,
incoming_queue: BTreeMap::new(),
rng: Box::new(self.rng.sub_rng()),
subset_handling_strategy: self.subset_handling_strategy.clone(),
encryption_schedule: self.encryption_schedule,

View File

@ -3,12 +3,13 @@ use std::collections::BTreeMap;
use std::sync::Arc;
use derivative::Derivative;
use log::debug;
use rand::{Rand, Rng};
use serde::{de::DeserializeOwned, Serialize};
use super::epoch_state::EpochState;
use super::{Batch, Error, ErrorKind, HoneyBadgerBuilder, Message, MessageContent, Result};
use {util, Contribution, DistAlgorithm, NetworkInfo, NodeIdT};
use super::{Batch, Error, ErrorKind, HoneyBadgerBuilder, Message, Result};
use {util, Contribution, DistAlgorithm, Epoched, Fault, FaultKind, NetworkInfo, NodeIdT};
pub use super::epoch_state::SubsetHandlingStrategy;
use threshold_decryption::EncryptionSchedule;
@ -30,8 +31,6 @@ pub struct HoneyBadger<C, N: Rand> {
pub(super) epochs: BTreeMap<u64, EpochState<C, N>>,
/// The maximum number of `Subset` instances that we run simultaneously.
pub(super) max_future_epochs: u64,
/// Messages for future epochs that couldn't be handled yet.
pub(super) incoming_queue: BTreeMap<u64, Vec<(N, MessageContent<N>)>>,
/// A random number generator used for secret key generation.
// Boxed to avoid overloading the algorithm's type with more generics.
#[derivative(Debug(format_with = "util::fmt_rng"))]
@ -42,6 +41,14 @@ pub struct HoneyBadger<C, N: Rand> {
pub(super) encryption_schedule: EncryptionSchedule,
}
impl<C, N: Rand> Epoched for HoneyBadger<C, N> {
type Epoch = u64;
fn epoch(&self) -> Self::Epoch {
self.epoch
}
}
pub type Step<C, N> = ::Step<HoneyBadger<C, N>>;
impl<C, N> DistAlgorithm for HoneyBadger<C, N>
@ -116,19 +123,17 @@ where
return Err(ErrorKind::UnknownSender.into());
}
let Message { epoch, content } = message;
if epoch > self.epoch + self.max_future_epochs {
// Postpone handling this message.
self.incoming_queue
.entry(epoch)
.or_insert_with(Vec::new)
.push((sender_id.clone(), content));
} else if self.epoch <= epoch {
if self.epoch <= epoch && epoch <= self.epoch + self.max_future_epochs {
let step = self
.epoch_state_mut(epoch)?
.handle_message_content(sender_id, content)?;
return Ok(step.join(self.try_output_batches()?));
} // And ignore all messages from past epochs.
Ok(Step::default())
Ok(step.join(self.try_output_batches()?))
} else if epoch > self.epoch + self.max_future_epochs {
Ok(Fault::new(sender_id.clone(), FaultKind::UnexpectedHbMessageEpoch).into())
} else {
// The message is late; discard it.
Ok(Step::default())
}
}
/// Returns `true` if input for the current epoch has already been provided.
@ -149,20 +154,12 @@ where
}
/// Increments the epoch number and clears any state that is local to the finished epoch.
fn update_epoch(&mut self) -> Result<Step<C, N>> {
fn update_epoch(&mut self) {
// Clear the state of the old epoch.
self.epochs.remove(&self.epoch);
self.epoch += 1;
self.has_input = false;
let max_epoch = self.epoch + self.max_future_epochs;
let mut step = Step::default();
if let Some(messages) = self.incoming_queue.remove(&max_epoch) {
let epoch_state = self.epoch_state_mut(max_epoch)?;
for (sender_id, content) in messages {
step.extend(epoch_state.handle_message_content(&sender_id, content)?);
}
}
Ok(step)
debug!("Started epoch {:?}", self.epoch);
}
/// Tries to decrypt contributions from all proposers and output those in a batch.
@ -176,7 +173,7 @@ where
// Queue the output and advance the epoch.
step.output.push(batch);
step.fault_log.extend(fault_log);
step.extend(self.update_epoch()?);
self.update_epoch();
}
Ok(step)
}
@ -195,4 +192,9 @@ where
)?),
})
}
/// Returns the maximum future epochs of the Honey Badger algorithm instance.
pub fn max_future_epochs(&self) -> u64 {
self.max_future_epochs
}
}

View File

@ -5,6 +5,8 @@ use serde_derive::{Deserialize, Serialize};
use subset;
use threshold_decryption;
use Epoched;
/// The content of a `HoneyBadger` message. It should be further annotated with an epoch.
#[derive(Clone, Debug, Deserialize, Rand, Serialize)]
pub enum MessageContent<N: Rand> {
@ -33,8 +35,10 @@ pub struct Message<N: Rand> {
pub(super) content: MessageContent<N>,
}
impl<N: Rand> Message<N> {
pub fn epoch(&self) -> u64 {
impl<N: Rand> Epoched for Message<N> {
type Epoch = u64;
fn epoch(&self) -> u64 {
self.epoch
}
}

View File

@ -29,6 +29,8 @@ mod error;
mod honey_badger;
mod message;
pub mod sender_queueable;
pub use self::batch::Batch;
pub use self::builder::HoneyBadgerBuilder;
pub use self::error::{Error, ErrorKind, Result};

View File

@ -0,0 +1,53 @@
use rand::Rand;
use serde::{de::DeserializeOwned, Serialize};
use super::{Batch, HoneyBadger, Message};
use sender_queue::{
SenderQueueableDistAlgorithm, SenderQueueableEpoch, SenderQueueableMessage,
SenderQueueableOutput,
};
use {Contribution, Epoched, NodeIdT};
impl<C, N> SenderQueueableOutput<N, Message<N>> for Batch<C, N>
where
C: Contribution,
N: NodeIdT + Rand,
{
fn added_node(&self) -> Option<N> {
None
}
fn convert_epoch(&self) -> u64 {
self.epoch()
}
}
impl<N> SenderQueueableMessage for Message<N>
where
N: Rand,
{
fn is_accepted(&self, them: u64, max_future_epochs: u64) -> bool {
let our_epoch = self.epoch();
them <= our_epoch && our_epoch <= them + max_future_epochs
}
fn is_obsolete(&self, them: u64) -> bool {
self.epoch() < them
}
}
impl SenderQueueableEpoch for u64 {
fn spanning_epochs(&self) -> Vec<Self> {
vec![]
}
}
impl<C, N> SenderQueueableDistAlgorithm for HoneyBadger<C, N>
where
C: Contribution + Serialize + DeserializeOwned,
N: NodeIdT + Rand,
{
fn max_future_epochs(&self) -> u64 {
self.max_future_epochs()
}
}

View File

@ -141,6 +141,7 @@ pub mod broadcast;
pub mod dynamic_honey_badger;
pub mod honey_badger;
pub mod queueing_honey_badger;
pub mod sender_queue;
pub mod subset;
pub mod sync_key_gen;
pub mod threshold_decryption;
@ -152,4 +153,4 @@ pub use crypto::pairing;
pub use fault_log::{Fault, FaultKind, FaultLog};
pub use messaging::{SourcedMessage, Target, TargetedMessage};
pub use network_info::NetworkInfo;
pub use traits::{Contribution, DistAlgorithm, Message, NodeIdT, SessionIdT, Step};
pub use traits::{Contribution, DistAlgorithm, Epoched, Message, NodeIdT, SessionIdT, Step};

View File

@ -22,6 +22,8 @@
//! entries, any two nodes will likely make almost disjoint contributions instead of proposing
//! the same transaction multiple times.
pub mod sender_queueable;
use std::fmt::{self, Display};
use std::marker::PhantomData;
use std::{cmp, iter};
@ -35,7 +37,7 @@ use dynamic_honey_badger::{self, Batch as DhbBatch, DynamicHoneyBadger, Message}
use transaction_queue::TransactionQueue;
use {util, Contribution, DistAlgorithm, NodeIdT};
pub use dynamic_honey_badger::{Change, ChangeState, Input, NodeChange};
pub use dynamic_honey_badger::{Change, ChangeState, Epoch, Input, NodeChange};
/// Queueing honey badger error variants.
#[derive(Debug, Fail)]
@ -177,7 +179,7 @@ where
pub struct QueueingHoneyBadger<T, N: Rand, Q> {
/// The target number of transactions to be included in each batch.
batch_size: usize,
/// The internal `DynamicHoneyBadger` instance.
/// The internal managed `DynamicHoneyBadger` instance.
dyn_hb: DynamicHoneyBadger<Vec<T>, N>,
/// The queue of pending transactions that haven't been output in a batch yet.
queue: Q,
@ -275,7 +277,7 @@ where
Ok(step.join(self.propose()?))
}
/// Returns a reference to the internal `DynamicHoneyBadger` instance.
/// Returns a reference to the internal managed `DynamicHoneyBadger` instance.
pub fn dyn_hb(&self) -> &DynamicHoneyBadger<Vec<T>, N> {
&self.dyn_hb
}

View File

@ -0,0 +1,18 @@
use rand::Rand;
use serde::{de::DeserializeOwned, Serialize};
use super::QueueingHoneyBadger;
use sender_queue::SenderQueueableDistAlgorithm;
use transaction_queue::TransactionQueue;
use {Contribution, NodeIdT};
impl<T, N, Q> SenderQueueableDistAlgorithm for QueueingHoneyBadger<T, N, Q>
where
T: Contribution + Serialize + DeserializeOwned + Clone,
N: NodeIdT + Serialize + DeserializeOwned + Rand,
Q: TransactionQueue<T>,
{
fn max_future_epochs(&self) -> u64 {
self.dyn_hb.max_future_epochs()
}
}

View File

@ -0,0 +1,46 @@
use rand::{Rand, Rng};
use serde_derive::{Deserialize, Serialize};
use Epoched;
#[derive(Clone, Debug, Deserialize, Serialize)]
pub enum Message<M: Epoched> {
EpochStarted(<M as Epoched>::Epoch),
Algo(M),
}
impl<M> Rand for Message<M>
where
M: Epoched + Rand,
<M as Epoched>::Epoch: Rand,
{
fn rand<R: Rng>(rng: &mut R) -> Self {
let message_type = *rng.choose(&["epoch", "algo"]).unwrap();
match message_type {
"epoch" => Message::EpochStarted(rng.gen()),
"algo" => Message::Algo(rng.gen()),
_ => unreachable!(),
}
}
}
impl<M> Epoched for Message<M>
where
M: Epoched,
{
type Epoch = <M as Epoched>::Epoch;
fn epoch(&self) -> Self::Epoch {
match self {
Message::EpochStarted(epoch) => *epoch,
Message::Algo(message) => message.epoch(),
}
}
}
impl<M: Epoched> From<M> for Message<M> {
fn from(message: M) -> Self {
Message::Algo(message)
}
}

361
src/sender_queue/mod.rs Normal file
View File

@ -0,0 +1,361 @@
//! # Sender queue
//!
//! A sender queue allows a `DistAlgorithm` that outputs `Epoched` messages to buffer those outgoing
//! messages based on their epochs. A message is sent to its recipient only when the recipient's
//! epoch matches the epoch of the message. Thus no queueing is required for incoming messages since
//! any incoming messages with non-matching epochs can be safely discarded.
mod message;
use std::collections::BTreeMap;
use std::fmt::Debug;
use rand::Rand;
use serde::{de::DeserializeOwned, Serialize};
use {DistAlgorithm, Epoched, NodeIdT, Target};
pub use self::message::Message;
pub trait SenderQueueableMessage: Epoched {
/// Whether the message is accepted in epoch `them`.
fn is_accepted(&self, them: <Self as Epoched>::Epoch, max_future_epochs: u64) -> bool;
/// Whether the epoch of the message is behind `them`.
fn is_obsolete(&self, them: <Self as Epoched>::Epoch) -> bool;
}
pub trait SenderQueueableOutput<N, M>: Epoched
where
N: NodeIdT,
M: Epoched,
{
/// Returns an optional new node added with the batch. This node should be added to the set of
/// all nodes.
fn added_node(&self) -> Option<N>;
/// Performs type conversion of the batch epoch info a fixed epoch type.
fn convert_epoch(&self) -> <M as Epoched>::Epoch;
}
pub trait SenderQueueableEpoch
where
Self: Sized,
{
/// A _spanning epoch_ of an epoch `e` is an epoch `e0` such that
///
/// - `e` and `e0` are incomparable by the partial ordering on epochs and
///
/// - the duration of `e0` is at least that of `e`.
///
/// Returned is a list of spanning epochs for the given epoch.
///
/// For example, any `DynamicHoneyBadger` epoch `Epoch((x, Some(y)))` has a unique spanning
/// epoch `Epoch((x, None))`. In turn, no epoch `Epoch((x, None))` has a spanning epoch.
fn spanning_epochs(&self) -> Vec<Self>;
}
pub trait SenderQueueableDistAlgorithm
where
Self: DistAlgorithm,
{
/// The maximum number of subsequent future epochs that the `DistAlgorithm` is allowed to handle
/// messages for.
fn max_future_epochs(&self) -> u64;
}
pub type OutgoingQueue<D> = BTreeMap<
(
<D as DistAlgorithm>::NodeId,
<<D as DistAlgorithm>::Message as Epoched>::Epoch,
),
Vec<<D as DistAlgorithm>::Message>,
>;
/// An instance of `DistAlgorithm` wrapped with a queue of outgoing messages, that is, a sender
/// queue. This wrapping ensures that the messages sent to remote instances lead to progress of the
/// entire consensus network. In particular, messages to lagging remote nodes are queued and sent
/// only when those nodes' epochs match the queued messages' epochs. Thus all nodes can handle
/// incoming messages without queueing them and can ignore messages whose epochs are not currently
/// acccepted.
#[derive(Debug)]
pub struct SenderQueue<D>
where
D: SenderQueueableDistAlgorithm,
D::Message: Clone + SenderQueueableMessage + Serialize + DeserializeOwned,
D::NodeId: NodeIdT + Rand,
D::Output: SenderQueueableOutput<D::NodeId, D::Message>,
{
/// The managed `DistAlgorithm` instance.
algo: D,
/// Our node ID.
our_id: D::NodeId,
/// Current epoch.
epoch: <D::Message as Epoched>::Epoch,
/// Messages that couldn't be handled yet by remote nodes.
outgoing_queue: OutgoingQueue<D>,
/// The set of all remote nodes on the network including validator as well as non-validator
/// (observer) nodes together with their epochs as of the last communication.
peer_epochs: BTreeMap<D::NodeId, <D::Message as Epoched>::Epoch>,
}
pub type Step<D> = ::Step<SenderQueue<D>>;
pub type Result<T, D> = ::std::result::Result<T, <D as DistAlgorithm>::Error>;
impl<D> DistAlgorithm for SenderQueue<D>
where
D: SenderQueueableDistAlgorithm + Debug + Send + Sync,
D::Message: Clone + SenderQueueableMessage + Serialize + DeserializeOwned,
D::NodeId: NodeIdT + Rand,
D::Output: SenderQueueableOutput<D::NodeId, D::Message>,
<D::Message as Epoched>::Epoch: SenderQueueableEpoch,
{
type NodeId = D::NodeId;
type Input = D::Input;
type Output = D::Output;
type Message = Message<D::Message>;
type Error = D::Error;
fn handle_input(&mut self, input: Self::Input) -> Result<Step<D>, D> {
self.handle_input(input)
}
fn handle_message(
&mut self,
sender_id: &D::NodeId,
message: Self::Message,
) -> Result<Step<D>, D> {
self.handle_message(sender_id, message)
}
fn terminated(&self) -> bool {
false
}
fn our_id(&self) -> &D::NodeId {
&self.our_id
}
}
impl<D> SenderQueue<D>
where
D: SenderQueueableDistAlgorithm + Debug + Send + Sync,
D::Message: Clone + SenderQueueableMessage + Serialize + DeserializeOwned,
D::NodeId: NodeIdT + Rand,
D::Output: SenderQueueableOutput<D::NodeId, D::Message>,
<D::Message as Epoched>::Epoch: SenderQueueableEpoch,
{
/// Returns a new `SenderQueueBuilder` configured to manage a given `DynamicHoneyBadger` instance.
pub fn builder<I>(algo: D, peer_ids: I) -> SenderQueueBuilder<D>
where
I: Iterator<Item = D::NodeId>,
{
SenderQueueBuilder::new(algo, peer_ids)
}
pub fn handle_input(&mut self, input: D::Input) -> Result<Step<D>, D> {
let mut step = self.algo.handle_input(input)?;
let mut sender_queue_step = self.update_epoch(&step);
self.defer_messages(&mut step);
sender_queue_step.extend(step.map(|output| output, Message::from));
Ok(sender_queue_step)
}
pub fn handle_message(
&mut self,
sender_id: &D::NodeId,
message: Message<D::Message>,
) -> Result<Step<D>, D> {
match message {
Message::EpochStarted(epoch) => Ok(self.handle_epoch_started(sender_id, epoch)),
Message::Algo(msg) => self.handle_message_content(sender_id, msg),
}
}
/// Handles an epoch start announcement.
fn handle_epoch_started(
&mut self,
sender_id: &D::NodeId,
epoch: <D::Message as Epoched>::Epoch,
) -> Step<D> {
self.peer_epochs
.entry(sender_id.clone())
.and_modify(|e| {
if *e < epoch {
*e = epoch;
}
}).or_insert(epoch);
self.remove_earlier_messages(sender_id, epoch);
self.process_new_epoch(sender_id, epoch)
}
/// Removes all messages queued for the remote node from epochs upto `epoch`.
fn remove_earlier_messages(
&mut self,
sender_id: &D::NodeId,
epoch: <D::Message as Epoched>::Epoch,
) {
let earlier_keys: Vec<_> = self
.outgoing_queue
.keys()
.cloned()
.filter(|(id, this_epoch)| id == sender_id && *this_epoch < epoch)
.collect();
for key in earlier_keys {
self.outgoing_queue.remove(&key);
}
}
/// Processes an announcement of a new epoch update received from a remote node.
fn process_new_epoch(
&mut self,
sender_id: &D::NodeId,
epoch: <D::Message as Epoched>::Epoch,
) -> Step<D> {
// Send any HB messages for the HB epoch.
let mut ready_messages = self
.outgoing_queue
.remove(&(sender_id.clone(), epoch))
.unwrap_or_default();
for u in epoch.spanning_epochs() {
// Send any DHB messages for the DHB era.
ready_messages.extend(
self.outgoing_queue
.remove(&(sender_id.clone(), u))
.unwrap_or_default(),
);
}
Step::from(
ready_messages
.into_iter()
.map(|msg| Target::Node(sender_id.clone()).message(Message::Algo(msg))),
)
}
/// Handles a Honey Badger algorithm message in a given epoch.
fn handle_message_content(
&mut self,
sender_id: &D::NodeId,
content: D::Message,
) -> Result<Step<D>, D> {
let mut step = self.algo.handle_message(sender_id, content)?;
let mut sender_queue_step = self.update_epoch(&step);
self.defer_messages(&mut step);
sender_queue_step.extend(step.map(|output| output, Message::from));
Ok(sender_queue_step)
}
/// Updates the current Honey Badger epoch.
fn update_epoch(&mut self, step: &::Step<D>) -> Step<D> {
// Look up `DynamicHoneyBadger` epoch updates and collect any added peers.
let new_epoch = step.output.iter().fold(self.epoch, |epoch, batch| {
let max_epoch = epoch.max(batch.convert_epoch());
if let Some(node) = batch.added_node() {
if &node != self.our_id() {
self.peer_epochs
.entry(node)
.or_insert_with(<D::Message as Epoched>::Epoch::default);
}
}
max_epoch
});
if new_epoch != self.epoch {
self.epoch = new_epoch;
// Announce the new epoch.
Target::All
.message(Message::EpochStarted(self.epoch))
.into()
} else {
Step::default()
}
}
/// Removes any messages to nodes at earlier epochs from the given `Step`. This may involve
/// decomposing a `Target::All` message into `Target::Node` messages and sending some of the
/// resulting messages while placing onto the queue those remaining messages whose recipient is
/// currently at an earlier epoch.
fn defer_messages(&mut self, step: &mut ::Step<D>) {
let max_future_epochs = self.algo.max_future_epochs();
// Append the deferred messages onto the queues.
for (id, message) in step.defer_messages(&self.peer_epochs, max_future_epochs) {
let epoch = message.epoch();
self.outgoing_queue
.entry((id, epoch))
.or_insert_with(Vec::new)
.push(message);
}
}
/// Returns a reference to the managed algorithm.
pub fn algo(&self) -> &D {
&self.algo
}
}
/// A builder of a Honey Badger with a sender queue. It configures the parameters and creates a new
/// instance of `SenderQueue`.
pub struct SenderQueueBuilder<D>
where
D: SenderQueueableDistAlgorithm,
D::Message: Epoched,
{
algo: D,
epoch: <D::Message as Epoched>::Epoch,
outgoing_queue: OutgoingQueue<D>,
peer_epochs: BTreeMap<D::NodeId, <D::Message as Epoched>::Epoch>,
}
impl<D> SenderQueueBuilder<D>
where
D: SenderQueueableDistAlgorithm + Debug + Send + Sync,
D::Message: Clone + SenderQueueableMessage + Serialize + DeserializeOwned,
D::NodeId: NodeIdT + Rand,
D::Output: SenderQueueableOutput<D::NodeId, D::Message>,
<D::Message as Epoched>::Epoch: SenderQueueableEpoch,
{
pub fn new<I>(algo: D, peer_ids: I) -> Self
where
I: Iterator<Item = D::NodeId>,
{
SenderQueueBuilder {
algo,
epoch: <D::Message as Epoched>::Epoch::default(),
outgoing_queue: BTreeMap::default(),
peer_epochs: peer_ids
.map(|id| (id, <D::Message as Epoched>::Epoch::default()))
.collect(),
}
}
pub fn epoch(mut self, epoch: <D::Message as Epoched>::Epoch) -> Self {
self.epoch = epoch;
self
}
pub fn outgoing_queue(mut self, outgoing_queue: OutgoingQueue<D>) -> Self {
self.outgoing_queue = outgoing_queue;
self
}
pub fn peer_epochs(
mut self,
peer_epochs: BTreeMap<D::NodeId, <D::Message as Epoched>::Epoch>,
) -> Self {
self.peer_epochs = peer_epochs;
self
}
pub fn build(self, our_id: D::NodeId) -> (SenderQueue<D>, Step<D>) {
let epoch = <D::Message as Epoched>::Epoch::default();
let sq = SenderQueue {
algo: self.algo,
our_id,
epoch: self.epoch,
outgoing_queue: self.outgoing_queue,
peer_epochs: self.peer_epochs,
};
let step: Step<D> = Target::All.message(Message::EpochStarted(epoch)).into();
(sq, step)
}
}

View File

@ -1,14 +1,17 @@
//! Common supertraits for distributed algorithms.
use std::collections::{BTreeMap, BTreeSet};
use std::fmt::{Debug, Display};
use std::hash::Hash;
use std::iter::once;
use failure::Fail;
use serde::Serialize;
use rand::Rand;
use serde::{de::DeserializeOwned, Serialize};
use fault_log::{Fault, FaultLog};
use TargetedMessage;
use sender_queue::SenderQueueableMessage;
use {Target, TargetedMessage};
/// A transaction, user message, or other user data.
pub trait Contribution: Eq + Debug + Hash + Send + Sync {}
@ -183,6 +186,120 @@ impl<D: DistAlgorithm> From<TargetedMessage<D::Message, D::NodeId>> for Step<D>
}
}
impl<D, I> From<I> for Step<D>
where
D: DistAlgorithm,
I: IntoIterator<Item = TargetedMessage<D::Message, D::NodeId>>,
{
fn from(msgs: I) -> Self {
Step {
messages: msgs.into_iter().collect(),
..Step::default()
}
}
}
/// An interface to objects with epoch numbers. Different algorithms may have different internal
/// notion of _epoch_. This interface summarizes the properties that are essential for the message
/// sender queue.
pub trait Epoched {
type Epoch: Copy + Message + Default + Eq + Ord + Serialize + DeserializeOwned;
/// Returns the object's epoch number.
fn epoch(&self) -> Self::Epoch;
}
impl<M: Epoched, N> Epoched for TargetedMessage<M, N> {
type Epoch = <M as Epoched>::Epoch;
fn epoch(&self) -> Self::Epoch {
self.message.epoch()
}
}
impl<'i, D> Step<D>
where
D: DistAlgorithm,
<D as DistAlgorithm>::NodeId: NodeIdT + Rand,
<D as DistAlgorithm>::Message:
'i + Clone + SenderQueueableMessage + Serialize + DeserializeOwned,
<D as DistAlgorithm>::Output: Epoched,
{
/// Removes and returns any messages that are not yet accepted by remote nodes according to the
/// mapping `remote_epochs`. This way the returned messages are postponed until later, and the
/// remaining messages can be sent to remote nodes without delay.
pub fn defer_messages(
&mut self,
peer_epochs: &'i BTreeMap<D::NodeId, <D::Message as Epoched>::Epoch>,
max_future_epochs: u64,
) -> Vec<(D::NodeId, D::Message)>
where
<D as DistAlgorithm>::NodeId: 'i,
{
let messages = &mut self.messages;
let (mut passed_msgs, failed_msgs): (Vec<_>, Vec<_>) =
messages
.drain(..)
.partition(|TargetedMessage { target, message }| match target {
Target::All => peer_epochs
.values()
.all(|&them| message.is_accepted(them, max_future_epochs)),
Target::Node(id) => peer_epochs
.get(&id)
.map_or(false, |&them| message.is_accepted(them, max_future_epochs)),
});
// `Target::All` messages contained in the result of the partitioning are analyzed further
// and each split into two sets of point messages: those which can be sent without delay and
// those which should be postponed.
let remote_nodes: BTreeSet<&D::NodeId> = peer_epochs.keys().collect();
let mut deferred_msgs: Vec<(D::NodeId, D::Message)> = Vec::new();
for msg in failed_msgs {
let m = msg.message;
match msg.target {
Target::Node(id) => {
let defer = {
let lagging = |&them| {
!(m.is_accepted(them, max_future_epochs) || m.is_obsolete(them))
};
peer_epochs.get(&id).map_or(true, lagging)
};
if defer {
deferred_msgs.push((id, m));
}
}
Target::All => {
let isnt_earlier_epoch =
|&them| m.is_accepted(them, max_future_epochs) || m.is_obsolete(them);
let lagging = |them| !isnt_earlier_epoch(them);
let accepts = |&them| m.is_accepted(them, max_future_epochs);
let accepting_nodes: BTreeSet<&D::NodeId> = peer_epochs
.iter()
.filter(|(_, them)| accepts(them))
.map(|(id, _)| id)
.collect();
let non_lagging_nodes: BTreeSet<&D::NodeId> = peer_epochs
.iter()
.filter(|(_, them)| isnt_earlier_epoch(them))
.map(|(id, _)| id)
.collect();
for &id in &accepting_nodes {
passed_msgs.push(Target::Node(id.clone()).message(m.clone()));
}
let lagging_nodes: BTreeSet<_> =
remote_nodes.difference(&non_lagging_nodes).collect();
for &id in lagging_nodes {
if peer_epochs.get(id).map_or(true, lagging) {
deferred_msgs.push((id.clone(), m.clone()));
}
}
}
}
}
messages.extend(passed_msgs);
deferred_msgs
}
}
/// A distributed algorithm that defines a message flow.
pub trait DistAlgorithm: Send + Sync {
/// Unique node identifier.

View File

@ -13,6 +13,7 @@ extern crate threshold_crypto as crypto;
mod network;
use std::collections::BTreeMap;
use std::iter;
use std::sync::Arc;
use itertools::Itertools;
@ -22,12 +23,13 @@ use rand::{Isaac64Rng, Rng};
use hbbft::dynamic_honey_badger::{
Batch, Change, ChangeState, DynamicHoneyBadger, Input, NodeChange,
};
use hbbft::sender_queue::{SenderQueue, Step};
use hbbft::transaction_queue::TransactionQueue;
use hbbft::NetworkInfo;
use network::{Adversary, MessageScheduler, NodeId, SilentAdversary, TestNetwork, TestNode};
type UsizeDhb = DynamicHoneyBadger<Vec<usize>, NodeId>;
type UsizeDhb = SenderQueue<DynamicHoneyBadger<Vec<usize>, NodeId>>;
/// Proposes `num_txs` values and expects nodes to output and order them.
fn test_dynamic_honey_badger<A>(mut network: TestNetwork<A, UsizeDhb>, num_txs: usize)
@ -80,8 +82,8 @@ where
.iter()
.filter(|(_, node)| {
node_busy(*node)
&& !node.instance().has_input()
&& node.instance().netinfo().is_validator()
&& !node.instance().algo().has_input()
&& node.instance().algo().netinfo().is_validator()
// Wait until all nodes have completed removing 0, before inputting `Add`.
&& (input_add || !has_remove(node))
// If there's only one node, it will immediately output on input. Make sure we
@ -99,6 +101,7 @@ where
if !input_add && network.nodes.values().all(has_remove) {
let pk = network.nodes[&NodeId(0)]
.instance()
.algo()
.netinfo()
.secret_key()
.public_key();
@ -114,8 +117,20 @@ where
// Allow passing `netinfo` by value. `TestNetwork` expects this function signature.
#[cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))]
fn new_dynamic_hb(netinfo: Arc<NetworkInfo<NodeId>>) -> UsizeDhb {
DynamicHoneyBadger::builder().build((*netinfo).clone())
fn new_dynamic_hb(
netinfo: Arc<NetworkInfo<NodeId>>,
) -> (UsizeDhb, Step<DynamicHoneyBadger<Vec<usize>, NodeId>>) {
let observer = NodeId(netinfo.num_nodes());
let our_id = *netinfo.our_id();
let peer_ids = netinfo
.all_ids()
.filter(|&&them| them != our_id)
.cloned()
.chain(iter::once(observer));
SenderQueue::builder(
DynamicHoneyBadger::builder().build((*netinfo).clone()),
peer_ids,
).build(our_id)
}
fn test_dynamic_honey_badger_different_sizes<A, F>(new_adversary: F, num_txs: usize)
@ -137,7 +152,8 @@ where
num_good_nodes, num_adv_nodes
);
let adversary = |adv_nodes| new_adversary(num_good_nodes, num_adv_nodes, adv_nodes);
let network = TestNetwork::new(num_good_nodes, num_adv_nodes, adversary, new_dynamic_hb);
let network =
TestNetwork::new_with_step(num_good_nodes, num_adv_nodes, adversary, new_dynamic_hb);
test_dynamic_honey_badger(network, num_txs);
}
}

View File

@ -14,22 +14,24 @@ extern crate threshold_crypto as crypto;
mod network;
use std::collections::BTreeMap;
use std::iter;
use std::sync::Arc;
use itertools::Itertools;
use log::info;
use rand::Rng;
use hbbft::honey_badger::{self, Batch, HoneyBadger, MessageContent};
use hbbft::honey_badger::{Batch, HoneyBadger, MessageContent};
use hbbft::sender_queue::{self, SenderQueue, Step};
use hbbft::transaction_queue::TransactionQueue;
use hbbft::{threshold_decryption, NetworkInfo, Target, TargetedMessage};
use hbbft::{threshold_decryption, DistAlgorithm, Epoched, NetworkInfo, Target, TargetedMessage};
use network::{
Adversary, MessageScheduler, MessageWithSender, NodeId, RandomAdversary, SilentAdversary,
TestNetwork, TestNode,
};
type UsizeHoneyBadger = HoneyBadger<Vec<usize>, NodeId>;
type UsizeHoneyBadger = SenderQueue<HoneyBadger<Vec<usize>, NodeId>>;
/// An adversary whose nodes only send messages with incorrect decryption shares.
pub struct FaultyShareAdversary {
@ -66,7 +68,7 @@ impl Adversary<UsizeHoneyBadger> for FaultyShareAdversary {
fn push_message(
&mut self,
sender_id: NodeId,
msg: TargetedMessage<honey_badger::Message<NodeId>, NodeId>,
msg: TargetedMessage<<UsizeHoneyBadger as DistAlgorithm>::Message, NodeId>,
) {
let NodeId(sender_id) = sender_id;
if sender_id < self.num_good {
@ -105,12 +107,12 @@ impl Adversary<UsizeHoneyBadger> for FaultyShareAdversary {
for proposer_id in 0..self.num_good + self.num_adv {
outgoing.push(MessageWithSender::new(
NodeId(sender_id),
Target::All.message(
Target::All.message(sender_queue::Message::Algo(
MessageContent::DecryptionShare {
proposer_id: NodeId(proposer_id),
share: threshold_decryption::Message(share.clone()),
}.with_epoch(*epoch),
),
)),
))
}
}
@ -142,7 +144,7 @@ where
let input_ids: Vec<_> = network
.nodes
.iter()
.filter(|(_, node)| !node.instance().has_input())
.filter(|(_, node)| !node.instance().algo().has_input())
.map(|(id, _)| *id)
.collect();
if let Some(id) = rng.choose(&input_ids) {
@ -181,8 +183,18 @@ where
}
}
fn new_honey_badger(netinfo: Arc<NetworkInfo<NodeId>>) -> UsizeHoneyBadger {
HoneyBadger::builder(netinfo).build()
fn new_honey_badger(
netinfo: Arc<NetworkInfo<NodeId>>,
) -> (UsizeHoneyBadger, Step<HoneyBadger<Vec<usize>, NodeId>>) {
let our_id = *netinfo.our_id();
let observer = NodeId(netinfo.num_nodes());
let nc = netinfo.clone();
let peer_ids = nc
.all_ids()
.filter(|&&them| them != our_id)
.cloned()
.chain(iter::once(observer));
SenderQueue::builder(HoneyBadger::builder(netinfo).build(), peer_ids).build(our_id)
}
fn test_honey_badger_different_sizes<A, F>(new_adversary: F, num_txs: usize)
@ -203,7 +215,8 @@ where
num_good_nodes, num_adv_nodes
);
let adversary = |adv_nodes| new_adversary(num_good_nodes, num_adv_nodes, adv_nodes);
let network = TestNetwork::new(num_good_nodes, num_adv_nodes, adversary, new_honey_badger);
let network =
TestNetwork::new_with_step(num_good_nodes, num_adv_nodes, adversary, new_honey_badger);
test_honey_badger(network, num_txs);
}
}

View File

@ -10,10 +10,11 @@ pub mod net;
use std::{collections, time};
use hbbft::dynamic_honey_badger::{Change, ChangeState, DynamicHoneyBadger, Input, NodeChange};
use hbbft::DistAlgorithm;
use hbbft::sender_queue::SenderQueue;
use hbbft::Epoched;
use net::adversary::ReorderingAdversary;
use net::proptest::{gen_seed, NetworkDimension, TestRng, TestRngSeed};
use net::NetBuilder;
use net::{NetBuilder, NewNodeInfo};
use proptest::{prelude::ProptestConfig, prop_compose, proptest, proptest_helper};
use rand::{Rng, SeedableRng};
@ -102,11 +103,16 @@ fn do_drop_and_readd(cfg: TestConfig) {
// Ensure runs are reproducible.
.rng(rng.gen::<TestRng>())
.adversary(ReorderingAdversary::new(rng.gen::<TestRng>()))
.using(move |node| {
println!("Constructing new dynamic honey badger node #{}", node.id);
DynamicHoneyBadger::builder()
.using_step(move |node: NewNodeInfo<SenderQueue<_>>| {
let id = node.id;
println!("Constructing new dynamic honey badger node #{}", id);
let dhb = DynamicHoneyBadger::builder()
.rng(node.rng)
.build(node.netinfo)
.build(node.netinfo.clone());
SenderQueue::builder(
dhb,
node.netinfo.all_ids().filter(|&&them| them != id).cloned(),
).build(node.id)
}).build()
.expect("could not construct test network");
@ -166,6 +172,7 @@ fn do_drop_and_readd(cfg: TestConfig) {
// Now we can add the node again. Public keys will be reused.
let pk = net[*pivot_node_id]
.algorithm()
.algo()
.netinfo()
.secret_key()
.public_key();
@ -205,7 +212,7 @@ fn do_drop_and_readd(cfg: TestConfig) {
// Examine potential algorithm output.
for batch in step.output {
println!(
"Received epoch {} batch on node {:?}.",
"Received epoch {:?} batch on node {:?}.",
batch.epoch(),
node_id,
);

View File

@ -13,6 +13,7 @@ extern crate threshold_crypto as crypto;
mod network;
use std::collections::BTreeMap;
use std::iter;
use std::sync::Arc;
use itertools::Itertools;
@ -21,13 +22,14 @@ use rand::{Isaac64Rng, Rng};
use hbbft::dynamic_honey_badger::DynamicHoneyBadger;
use hbbft::queueing_honey_badger::{
Batch, Change, ChangeState, Input, NodeChange, QueueingHoneyBadger, Step,
Batch, Change, ChangeState, Input, NodeChange, QueueingHoneyBadger,
};
use hbbft::sender_queue::{Message, SenderQueue, Step};
use hbbft::NetworkInfo;
use network::{Adversary, MessageScheduler, NodeId, SilentAdversary, TestNetwork, TestNode};
type QHB = QueueingHoneyBadger<usize, NodeId, Vec<usize>>;
type QHB = SenderQueue<QueueingHoneyBadger<usize, NodeId, Vec<usize>>>;
/// Proposes `num_txs` values and expects nodes to output and order them.
fn test_queueing_honey_badger<A>(mut network: TestNetwork<A, QHB>, num_txs: usize)
@ -80,6 +82,7 @@ where
}
let pk = network.nodes[&NodeId(0)]
.instance()
.algo()
.dyn_hb()
.netinfo()
.secret_key()
@ -96,12 +99,22 @@ where
// Allow passing `netinfo` by value. `TestNetwork` expects this function signature.
#[cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))]
fn new_queueing_hb(netinfo: Arc<NetworkInfo<NodeId>>) -> (QHB, Step<usize, NodeId, Vec<usize>>) {
let dyn_hb = DynamicHoneyBadger::builder().build((*netinfo).clone());
fn new_queueing_hb(
netinfo: Arc<NetworkInfo<NodeId>>,
) -> (QHB, Step<QueueingHoneyBadger<usize, NodeId, Vec<usize>>>) {
let observer = NodeId(netinfo.num_nodes());
let our_id = *netinfo.our_id();
let peer_ids = netinfo
.all_ids()
.filter(|&&them| them != our_id)
.cloned()
.chain(iter::once(observer));
let dhb = DynamicHoneyBadger::builder().build((*netinfo).clone());
let rng = rand::thread_rng().gen::<Isaac64Rng>();
QueueingHoneyBadger::builder(dyn_hb)
.batch_size(3)
.build(rng)
let (qhb, qhb_step) = QueueingHoneyBadger::builder(dhb).batch_size(3).build(rng);
let (sq, mut step) = SenderQueue::builder(qhb, peer_ids).build(our_id);
step.extend_with(qhb_step, Message::from);
(sq, step)
}
fn test_queueing_honey_badger_different_sizes<A, F>(new_adversary: F, num_txs: usize)