From ee46dd4b81988dd1e111fa77767b5d1283bc9f7d Mon Sep 17 00:00:00 2001 From: Vladimir Komendantskiy Date: Thu, 25 Oct 2018 16:07:52 +0100 Subject: [PATCH] sender queue implementation --- examples/simulation.rs | 24 +- src/dynamic_honey_badger/batch.rs | 38 +- src/dynamic_honey_badger/builder.rs | 37 +- .../dynamic_honey_badger.rs | 91 ++--- src/dynamic_honey_badger/mod.rs | 63 ++- src/dynamic_honey_badger/sender_queueable.rs | 75 ++++ src/fault_log.rs | 8 +- src/honey_badger/batch.rs | 11 +- src/honey_badger/builder.rs | 5 +- src/honey_badger/honey_badger.rs | 52 +-- src/honey_badger/message.rs | 8 +- src/honey_badger/mod.rs | 2 + src/honey_badger/sender_queueable.rs | 53 +++ src/lib.rs | 3 +- .../mod.rs} | 8 +- src/queueing_honey_badger/sender_queueable.rs | 18 + src/sender_queue/message.rs | 46 +++ src/sender_queue/mod.rs | 361 ++++++++++++++++++ src/traits.rs | 121 +++++- tests/dynamic_honey_badger.rs | 28 +- tests/honey_badger.rs | 33 +- tests/net_dynamic_hb.rs | 21 +- tests/queueing_honey_badger.rs | 27 +- 23 files changed, 966 insertions(+), 167 deletions(-) create mode 100644 src/dynamic_honey_badger/sender_queueable.rs create mode 100644 src/honey_badger/sender_queueable.rs rename src/{queueing_honey_badger.rs => queueing_honey_badger/mod.rs} (97%) create mode 100644 src/queueing_honey_badger/sender_queueable.rs create mode 100644 src/sender_queue/message.rs create mode 100644 src/sender_queue/mod.rs diff --git a/examples/simulation.rs b/examples/simulation.rs index 9d694fa..da8ab51 100644 --- a/examples/simulation.rs +++ b/examples/simulation.rs @@ -26,6 +26,7 @@ use signifix::{metric, TryFrom}; use hbbft::dynamic_honey_badger::DynamicHoneyBadger; use hbbft::queueing_honey_badger::{Batch, QueueingHoneyBadger}; +use hbbft::sender_queue::{Message, SenderQueue}; use hbbft::{DistAlgorithm, NetworkInfo, Step, Target}; const VERSION: &str = env!("CARGO_PKG_VERSION"); @@ -347,6 +348,8 @@ struct EpochInfo { nodes: BTreeMap)>, } +type QHB = SenderQueue>>; + impl EpochInfo { /// Adds a batch to this epoch. Prints information if the epoch is complete. fn add( @@ -354,7 +357,7 @@ impl EpochInfo { id: NodeId, time: Duration, batch: &Batch, - network: &TestNetwork>>, + network: &TestNetwork, ) { if self.nodes.contains_key(&id) { return; @@ -373,7 +376,7 @@ impl EpochInfo { let txs = batch.iter().unique().count(); println!( "{:>5} {:6} {:6} {:5} {:9} {:>9}B", - batch.epoch().to_string().cyan(), + batch.seqnum().to_string().cyan(), min_t.as_secs() * 1000 + u64::from(max_t.subsec_nanos()) / 1_000_000, max_t.as_secs() * 1000 + u64::from(max_t.subsec_nanos()) / 1_000_000, txs, @@ -385,9 +388,7 @@ impl EpochInfo { } /// Proposes `num_txs` values and expects nodes to output and order them. -fn simulate_honey_badger( - mut network: TestNetwork>>, -) { +fn simulate_honey_badger(mut network: TestNetwork) { // Handle messages until all nodes have output all transactions. println!( "{}", @@ -396,7 +397,7 @@ fn simulate_honey_badger( let mut epochs = Vec::new(); while let Some(id) = network.step() { for &(time, ref batch) in &network.nodes[&id].outputs { - let epoch = batch.epoch() as usize; + let epoch = batch.seqnum() as usize; if epochs.len() <= epoch { epochs.resize(epoch + 1, EpochInfo::default()); } @@ -436,11 +437,16 @@ fn main() { .map(|_| Transaction::new(args.flag_tx_size)) .collect(); let new_honey_badger = |netinfo: NetworkInfo| { - let dyn_hb = DynamicHoneyBadger::builder().build(netinfo); - QueueingHoneyBadger::builder(dyn_hb) + let dhb = DynamicHoneyBadger::builder().build(netinfo.clone()); + let (qhb, qhb_step) = QueueingHoneyBadger::builder(dhb) .batch_size(args.flag_b) .build_with_transactions(txs.clone(), rand::thread_rng().gen::()) - .expect("instantiate QueueingHoneyBadger") + .expect("instantiate QueueingHoneyBadger"); + let our_id = *netinfo.our_id(); + let peer_ids = netinfo.all_ids().filter(|&&them| them != our_id).cloned(); + let (sq, mut step) = SenderQueue::builder(qhb, peer_ids).build(our_id); + step.extend_with(qhb_step, Message::from); + (sq, step) }; let hw_quality = HwQuality { latency: Duration::from_millis(args.flag_lag), diff --git a/src/dynamic_honey_badger/batch.rs b/src/dynamic_honey_badger/batch.rs index b1e9fb5..88784e5 100644 --- a/src/dynamic_honey_badger/batch.rs +++ b/src/dynamic_honey_badger/batch.rs @@ -2,14 +2,16 @@ use std::collections::BTreeMap; use std::sync::Arc; use super::EncryptionSchedule; -use super::{ChangeState, JoinPlan}; -use {NetworkInfo, NodeIdT}; +use super::{ChangeState, Epoch, JoinPlan}; +use {Epoched, NetworkInfo, NodeIdT}; /// A batch of transactions the algorithm has output. #[derive(Clone, Debug)] pub struct Batch { /// The sequence number: there is exactly one batch in each epoch. - pub(super) epoch: u64, + pub(super) seqnum: u64, + /// The current `DynamicHoneyBadger` era. + pub(super) era: u64, /// The user contributions committed in this epoch. pub(super) contributions: BTreeMap, /// The current state of adding or removing a node: whether any is in progress, or completed @@ -21,9 +23,30 @@ pub struct Batch { pub(super) encryption_schedule: EncryptionSchedule, } +impl Epoched for Batch { + type Epoch = Epoch; + + /// Returns the **next** `DynamicHoneyBadger` epoch after the sequential epoch of the batch. + fn epoch(&self) -> Epoch { + let seqnum = self.seqnum; + let era = self.era; + if self.change == ChangeState::None { + Epoch(era, Some(seqnum - era + 1)) + } else { + Epoch(seqnum + 1, Some(0)) + } + } +} + impl Batch { - pub fn epoch(&self) -> u64 { - self.epoch + /// Returns the linear epoch of this `DynamicHoneyBadger` batch. + pub fn seqnum(&self) -> u64 { + self.seqnum + } + + /// Returns the `DynamicHoneyBadger` era of the batch. + pub fn era(&self) -> u64 { + self.era } /// Returns whether any change to the set of participating nodes is in progress or was @@ -89,7 +112,7 @@ impl Batch { return None; } Some(JoinPlan { - epoch: self.epoch + 1, + era: self.seqnum + 1, change: self.change.clone(), pub_key_set: self.netinfo.public_key_set().clone(), pub_keys: self.netinfo.public_key_map().clone(), @@ -103,7 +126,8 @@ impl Batch { where C: PartialEq, { - self.epoch == other.epoch + self.seqnum == other.seqnum + && self.era == other.era && self.contributions == other.contributions && self.change == other.change && self.netinfo.public_key_set() == other.netinfo.public_key_set() diff --git a/src/dynamic_honey_badger/builder.rs b/src/dynamic_honey_badger/builder.rs index f8b9f71..66266b9 100644 --- a/src/dynamic_honey_badger/builder.rs +++ b/src/dynamic_honey_badger/builder.rs @@ -16,10 +16,10 @@ use {Contribution, NetworkInfo, NodeIdT}; /// A Dynamic Honey Badger builder, to configure the parameters and create new instances of /// `DynamicHoneyBadger`. pub struct DynamicHoneyBadgerBuilder { - /// Start in this epoch. - epoch: u64, + /// Start in this era. + era: u64, /// The maximum number of future epochs for which we handle messages simultaneously. - max_future_epochs: usize, + max_future_epochs: u64, /// Random number generator passed on to algorithm instance for key generation. Also used to /// instantiate `HoneyBadger`. rng: Box, @@ -30,11 +30,14 @@ pub struct DynamicHoneyBadgerBuilder { _phantom: PhantomData<(C, N)>, } -impl Default for DynamicHoneyBadgerBuilder { +impl Default for DynamicHoneyBadgerBuilder +where + N: Ord, +{ fn default() -> Self { // TODO: Use the defaults from `HoneyBadgerBuilder`. DynamicHoneyBadgerBuilder { - epoch: 0, + era: 0, max_future_epochs: 3, rng: Box::new(rand::thread_rng()), subset_handling_strategy: SubsetHandlingStrategy::Incremental, @@ -55,14 +58,14 @@ where Self::default() } - /// Sets the starting epoch to the given value. - pub fn epoch(&mut self, epoch: u64) -> &mut Self { - self.epoch = epoch; + /// Sets the starting era to the given value. + pub fn era(&mut self, era: u64) -> &mut Self { + self.era = era; self } /// Sets the maximum number of future epochs for which we handle messages simultaneously. - pub fn max_future_epochs(&mut self, max_future_epochs: usize) -> &mut Self { + pub fn max_future_epochs(&mut self, max_future_epochs: u64) -> &mut Self { self.max_future_epochs = max_future_epochs; self } @@ -91,18 +94,18 @@ where /// Creates a new Dynamic Honey Badger instance with an empty buffer. pub fn build(&mut self, netinfo: NetworkInfo) -> DynamicHoneyBadger { let DynamicHoneyBadgerBuilder { - epoch, + era, max_future_epochs, rng, subset_handling_strategy, encryption_schedule, _phantom, } = self; - let epoch = *epoch; + let era = *era; let max_future_epochs = *max_future_epochs; let arc_netinfo = Arc::new(netinfo.clone()); let honey_badger = HoneyBadger::builder(arc_netinfo.clone()) - .session_id(epoch) + .session_id(era) .max_future_epochs(max_future_epochs) .rng(rng.sub_rng()) .subset_handling_strategy(subset_handling_strategy.clone()) @@ -111,12 +114,11 @@ where DynamicHoneyBadger { netinfo, max_future_epochs, - start_epoch: epoch, + era, vote_counter: VoteCounter::new(arc_netinfo, 0), key_gen_msg_buffer: Vec::new(), honey_badger, key_gen_state: None, - incoming_queue: Vec::new(), rng: Box::new(rng.sub_rng()), } } @@ -155,17 +157,16 @@ where let mut dhb = DynamicHoneyBadger { netinfo, max_future_epochs: self.max_future_epochs, - start_epoch: join_plan.epoch, - vote_counter: VoteCounter::new(arc_netinfo, join_plan.epoch), + era: join_plan.era, + vote_counter: VoteCounter::new(arc_netinfo, join_plan.era), key_gen_msg_buffer: Vec::new(), honey_badger, key_gen_state: None, - incoming_queue: Vec::new(), rng: Box::new(self.rng.sub_rng()), }; let step = match join_plan.change { ChangeState::InProgress(ref change) => match change { - Change::NodeChange(change) => dhb.update_key_gen(join_plan.epoch, change)?, + Change::NodeChange(change) => dhb.update_key_gen(join_plan.era, change)?, _ => Step::default(), }, ChangeState::None | ChangeState::Complete(..) => Step::default(), diff --git a/src/dynamic_honey_badger/dynamic_honey_badger.rs b/src/dynamic_honey_badger/dynamic_honey_badger.rs index b043662..2f20fdb 100644 --- a/src/dynamic_honey_badger/dynamic_honey_badger.rs +++ b/src/dynamic_honey_badger/dynamic_honey_badger.rs @@ -1,6 +1,6 @@ use std::collections::BTreeMap; use std::sync::Arc; -use std::{fmt, mem, result}; +use std::{fmt, result}; use bincode; use crypto::Signature; @@ -30,9 +30,9 @@ pub struct DynamicHoneyBadger { /// Shared network data. pub(super) netinfo: NetworkInfo, /// The maximum number of future epochs for which we handle messages simultaneously. - pub(super) max_future_epochs: usize, + pub(super) max_future_epochs: u64, /// The first epoch after the latest node change. - pub(super) start_epoch: u64, + pub(super) era: u64, /// The buffer and counter for the pending and committed change votes. pub(super) vote_counter: VoteCounter, /// Pending node transactions that we will propose in the next epoch. @@ -41,8 +41,6 @@ pub struct DynamicHoneyBadger { pub(super) honey_badger: HoneyBadger, N>, /// The current key generation process, and the change it applies to. pub(super) key_gen_state: Option>, - /// A queue for messages from future epochs that cannot be handled yet. - pub(super) incoming_queue: Vec<(N, Message)>, /// A random number generator used for secret key generation. // Boxed to avoid overloading the algorithm's type with more generics. #[derivative(Debug(format_with = "util::fmt_rng"))] @@ -107,7 +105,7 @@ where let key_gen_messages = self .key_gen_msg_buffer .iter() - .filter(|kg_msg| kg_msg.epoch() == self.start_epoch) + .filter(|kg_msg| kg_msg.era() == self.era) .cloned() .collect(); let step = self @@ -137,16 +135,7 @@ where /// /// This must be called with every message we receive from another node. pub fn handle_message(&mut self, sender_id: &N, message: Message) -> Result> { - let epoch = message.start_epoch(); - if epoch < self.start_epoch { - // Obsolete message. - Ok(Step::default()) - } else if epoch > self.start_epoch { - // Message cannot be handled yet. Save it for later. - let entry = (sender_id.clone(), message); - self.incoming_queue.push(entry); - Ok(Step::default()) - } else { + if message.era() == self.era { match message { Message::HoneyBadger(_, hb_msg) => { self.handle_honey_badger_message(sender_id, hb_msg) @@ -159,6 +148,11 @@ where .add_pending_vote(sender_id, signed_vote) .map(FaultLog::into), } + } else if message.era() > self.era { + Ok(Fault::new(sender_id.clone(), FaultKind::UnexpectedDhbMessageEra).into()) + } else { + // The message is late; discard it. + Ok(Step::default()) } } @@ -238,7 +232,7 @@ where return Ok(Fault::new(sender_id.clone(), fault_kind).into()); } - let tx = SignedKeyGenMsg(self.start_epoch, sender_id.clone(), kg_msg, sig); + let tx = SignedKeyGenMsg(self.era, sender_id.clone(), kg_msg, sig); self.key_gen_msg_buffer.push(tx); Ok(FaultLog::default()) } @@ -249,10 +243,10 @@ where hb_step: honey_badger::Step, N>, ) -> Result> { let mut step: Step = Step::default(); - let start_epoch = self.start_epoch; - let output = step.extend_with(hb_step, |hb_msg| Message::HoneyBadger(start_epoch, hb_msg)); + let output = step.extend_with(hb_step, |hb_msg| Message::HoneyBadger(self.era, hb_msg)); for hb_batch in output { - let batch_epoch = hb_batch.epoch + self.start_epoch; + let batch_era = self.era; + let batch_seqnum = hb_batch.epoch + batch_era; let mut batch_contributions = BTreeMap::new(); // Add the user transactions to `batch` and handle votes and DKG messages. @@ -267,9 +261,9 @@ where batch_contributions.insert(id.clone(), contrib); self.key_gen_msg_buffer .retain(|skgm| !key_gen_messages.contains(skgm)); - for SignedKeyGenMsg(epoch, s_id, kg_msg, sig) in key_gen_messages { - if epoch != self.start_epoch { - let fault_kind = FaultKind::InvalidKeyGenMessageEpoch; + for SignedKeyGenMsg(era, s_id, kg_msg, sig) in key_gen_messages { + if era != self.era { + let fault_kind = FaultKind::InvalidKeyGenMessageEra; step.fault_log.append(id.clone(), fault_kind); } else if !self.verify_signature(&s_id, &sig, &kg_msg)? { let fault_kind = FaultKind::InvalidKeyGenMessageSignature; @@ -282,19 +276,18 @@ where } } } - let change = if let Some(kgs) = self.take_ready_key_gen() { // If DKG completed, apply the change, restart Honey Badger, and inform the user. debug!("{}: DKG for {:?} complete!", self, kgs.change); self.netinfo = kgs.key_gen.into_network_info()?; - self.restart_honey_badger(batch_epoch + 1, None); + self.restart_honey_badger(batch_seqnum + 1, None); ChangeState::Complete(Change::NodeChange(kgs.change)) } else if let Some(change) = self.vote_counter.compute_winner().cloned() { // If there is a new change, restart DKG. Inform the user about the current change. step.extend(match &change { - Change::NodeChange(change) => self.update_key_gen(batch_epoch + 1, &change)?, + Change::NodeChange(change) => self.update_key_gen(batch_seqnum + 1, &change)?, Change::EncryptionSchedule(schedule) => { - self.update_encryption_schedule(batch_epoch + 1, *schedule)? + self.update_encryption_schedule(batch_seqnum + 1, *schedule)? } }); match change { @@ -305,29 +298,23 @@ where ChangeState::None }; step.output.push(Batch { - epoch: batch_epoch, + seqnum: batch_seqnum, + era: batch_era, change, netinfo: Arc::new(self.netinfo.clone()), contributions: batch_contributions, encryption_schedule: self.honey_badger.get_encryption_schedule(), }); } - // If `start_epoch` changed, we can now handle some queued messages. - if start_epoch < self.start_epoch { - let queue = mem::replace(&mut self.incoming_queue, Vec::new()); - for (sender_id, msg) in queue { - step.extend(self.handle_message(&sender_id, msg)?); - } - } Ok(step) } pub(super) fn update_encryption_schedule( &mut self, - epoch: u64, + era: u64, encryption_schedule: EncryptionSchedule, ) -> Result> { - self.restart_honey_badger(epoch, Some(encryption_schedule)); + self.restart_honey_badger(era, Some(encryption_schedule)); Ok(Step::default()) } @@ -335,7 +322,7 @@ where /// by the current change. pub(super) fn update_key_gen( &mut self, - epoch: u64, + era: u64, change: &NodeChange, ) -> Result> { if self.key_gen_state.as_ref().map(|kgs| &kgs.change) == Some(change) { @@ -350,7 +337,7 @@ where } { warn!("{}: No-op change: {:?}", self, change); } - self.restart_honey_badger(epoch, None); + self.restart_honey_badger(era, None); // TODO: This needs to be the same as `num_faulty` will be in the _new_ // `NetworkInfo` if the change goes through. It would be safer to deduplicate. let threshold = (pub_keys.len() - 1) / 3; @@ -366,17 +353,13 @@ where } /// Starts a new `HoneyBadger` instance and resets the vote counter. - fn restart_honey_badger( - &mut self, - epoch: u64, - encryption_schedule: Option, - ) { - self.start_epoch = epoch; - self.key_gen_msg_buffer.retain(|kg_msg| kg_msg.0 >= epoch); + fn restart_honey_badger(&mut self, era: u64, encryption_schedule: Option) { + self.era = era; + self.key_gen_msg_buffer.retain(|kg_msg| kg_msg.0 >= era); let netinfo = Arc::new(self.netinfo.clone()); - self.vote_counter = VoteCounter::new(netinfo.clone(), epoch); + self.vote_counter = VoteCounter::new(netinfo.clone(), era); self.honey_badger = HoneyBadger::builder(netinfo) - .session_id(epoch) + .session_id(era) .max_future_epochs(self.max_future_epochs) .rng(self.rng.sub_rng()) .encryption_schedule( @@ -434,11 +417,10 @@ where let sig = Box::new(self.netinfo.secret_key().sign(ser)); if self.netinfo.is_validator() { let our_id = self.our_id().clone(); - let signed_msg = - SignedKeyGenMsg(self.start_epoch, our_id, kg_msg.clone(), *sig.clone()); + let signed_msg = SignedKeyGenMsg(self.era, our_id, kg_msg.clone(), *sig.clone()); self.key_gen_msg_buffer.push(signed_msg); } - let msg = Message::KeyGen(self.start_epoch, kg_msg, sig); + let msg = Message::KeyGen(self.era, kg_msg, sig); Ok(Target::All.message(msg).into()) } @@ -479,6 +461,11 @@ where let pk_opt = self.netinfo.public_key(node_id).or_else(get_candidate_key); Ok(pk_opt.map_or(false, |pk| pk.verify(&sig, ser))) } + + /// Returns the maximum future epochs of the Honey Badger algorithm instance. + pub fn max_future_epochs(&self) -> u64 { + self.max_future_epochs + } } impl fmt::Display for DynamicHoneyBadger @@ -487,6 +474,6 @@ where N: NodeIdT + Serialize + DeserializeOwned + Rand, { fn fmt(&self, f: &mut fmt::Formatter) -> result::Result<(), fmt::Error> { - write!(f, "{:?} DHB(era: {})", self.our_id(), self.start_epoch) + write!(f, "{:?} DHB(era: {})", self.our_id(), self.era) } } diff --git a/src/dynamic_honey_badger/mod.rs b/src/dynamic_honey_badger/mod.rs index 5d53839..39f96cb 100644 --- a/src/dynamic_honey_badger/mod.rs +++ b/src/dynamic_honey_badger/mod.rs @@ -62,16 +62,20 @@ mod dynamic_honey_badger; mod error; mod votes; +pub mod sender_queueable; + +use std::cmp::Ordering; +use std::collections::BTreeMap; + use crypto::{PublicKey, PublicKeySet, Signature}; use rand::Rand; use serde_derive::{Deserialize, Serialize}; -use std::collections::BTreeMap; use self::votes::{SignedVote, VoteCounter}; use super::threshold_decryption::EncryptionSchedule; use honey_badger::Message as HbMessage; use sync_key_gen::{Ack, Part, SyncKeyGen}; -use NodeIdT; +use {Epoched, NodeIdT}; pub use self::batch::Batch; pub use self::builder::DynamicHoneyBadgerBuilder; @@ -113,19 +117,54 @@ pub enum Message { } impl Message { - fn start_epoch(&self) -> u64 { + fn era(&self) -> u64 { match *self { - Message::HoneyBadger(epoch, _) => epoch, - Message::KeyGen(epoch, _, _) => epoch, + Message::HoneyBadger(era, _) => era, + Message::KeyGen(era, _, _) => era, Message::SignedVote(ref signed_vote) => signed_vote.era(), } } +} - pub fn epoch(&self) -> u64 { +/// Dynamic Honey Badger epoch. It consists of an era and an epoch of Honey Badger that started in +/// that era. For messages originating from `DynamicHoneyBadger` as opposed to `HoneyBadger`, that +/// HoneyBadger epoch is `None`. +#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, Serialize, Deserialize)] +pub struct Epoch(pub(super) u64, pub(super) Option); + +impl PartialOrd for Epoch { + /// Partial ordering on epochs. For any `era` and `hb_epoch`, two epochs `Epoch(era, None)` and `Epoch(era, + /// Some(hb_epoch))` are incomparable. + fn partial_cmp(&self, other: &Epoch) -> Option { + let (&Epoch(a, b), &Epoch(c, d)) = (self, other); + if a < c { + Some(Ordering::Less) + } else if a > c { + Some(Ordering::Greater) + } else if b.is_none() && d.is_none() { + Some(Ordering::Equal) + } else if let (Some(b), Some(d)) = (b, d) { + Some(Ord::cmp(&b, &d)) + } else { + None + } + } +} + +impl Default for Epoch { + fn default() -> Epoch { + Epoch(0, Some(0)) + } +} + +impl Epoched for Message { + type Epoch = Epoch; + + fn epoch(&self) -> Epoch { match *self { - Message::HoneyBadger(start_epoch, ref msg) => start_epoch + msg.epoch(), - Message::KeyGen(epoch, _, _) => epoch, - Message::SignedVote(ref signed_vote) => signed_vote.era(), + Message::HoneyBadger(era, ref msg) => Epoch(era, Some(msg.epoch())), + Message::KeyGen(era, _, _) => Epoch(era, None), + Message::SignedVote(ref signed_vote) => Epoch(signed_vote.era(), None), } } } @@ -136,7 +175,7 @@ impl Message { #[derive(Serialize, Deserialize, Debug, Clone)] pub struct JoinPlan { /// The first epoch the new node will observe. - epoch: u64, + era: u64, /// The current change. If `InProgress`, key generation for it is beginning at `epoch`. change: ChangeState, /// The current public key set for threshold cryptography. @@ -208,8 +247,8 @@ struct InternalContrib { struct SignedKeyGenMsg(u64, N, KeyGenMessage, Signature); impl SignedKeyGenMsg { - /// Returns the start epoch of the ongoing key generation. - fn epoch(&self) -> u64 { + /// Returns the era of the ongoing key generation. + fn era(&self) -> u64 { self.0 } } diff --git a/src/dynamic_honey_badger/sender_queueable.rs b/src/dynamic_honey_badger/sender_queueable.rs new file mode 100644 index 0000000..396f18c --- /dev/null +++ b/src/dynamic_honey_badger/sender_queueable.rs @@ -0,0 +1,75 @@ +use log::error; +use rand::Rand; +use serde::{de::DeserializeOwned, Serialize}; + +use super::{Batch, Change, ChangeState, DynamicHoneyBadger, Epoch, Message, NodeChange}; +use sender_queue::{ + SenderQueueableDistAlgorithm, SenderQueueableEpoch, SenderQueueableMessage, + SenderQueueableOutput, +}; +use {Contribution, Epoched, NodeIdT}; + +impl SenderQueueableOutput> for Batch +where + C: Contribution, + N: NodeIdT + Rand, +{ + fn added_node(&self) -> Option { + if let ChangeState::InProgress(Change::NodeChange(NodeChange::Add(ref id, _))) = self.change + { + // Register the new node to send broadcast messages to it from now on. + Some(id.clone()) + } else { + None + } + } + + fn convert_epoch(&self) -> Epoch { + self.epoch() + } +} + +impl SenderQueueableMessage for Message +where + N: Rand, +{ + fn is_accepted(&self, Epoch(them_era, them_hb_epoch): Epoch, max_future_epochs: u64) -> bool { + let Epoch(era, hb_epoch) = self.epoch(); + if era != them_era { + return false; + } + match (hb_epoch, them_hb_epoch) { + (Some(us), Some(them)) => them <= us && us <= them + max_future_epochs, + (None, Some(_)) => true, + (_, None) => { + error!("Peer's Honey Badger epoch undefined"); + false + } + } + } + + fn is_obsolete(&self, Epoch(them_era, them_hb_epoch): Epoch) -> bool { + let Epoch(era, hb_epoch) = self.epoch(); + era < them_era || (era == them_era && hb_epoch.is_some() && hb_epoch < them_hb_epoch) + } +} + +impl SenderQueueableEpoch for Epoch { + fn spanning_epochs(&self) -> Vec { + if let Epoch(era, Some(_)) = *self { + vec![Epoch(era, None)] + } else { + vec![] + } + } +} + +impl SenderQueueableDistAlgorithm for DynamicHoneyBadger +where + C: Contribution + Serialize + DeserializeOwned, + N: NodeIdT + Serialize + DeserializeOwned + Rand, +{ + fn max_future_epochs(&self) -> u64 { + self.max_future_epochs() + } +} diff --git a/src/fault_log.rs b/src/fault_log.rs index f8b92d5..e7832e3 100644 --- a/src/fault_log.rs +++ b/src/fault_log.rs @@ -20,6 +20,8 @@ pub enum FaultKind { DeserializeCiphertext, /// `HoneyBadger` received an invalid ciphertext from the proposer. InvalidCiphertext, + /// `HoneyBadger` received a message with an invalid epoch. + UnexpectedHbMessageEpoch, /// `ThresholdDecryption` received multiple shares from the same sender. MultipleDecryptionShares, /// `Broadcast` received a `Value` from a node other than the proposer. @@ -39,8 +41,8 @@ pub enum FaultKind { BatchDeserializationFailed, /// `DynamicHoneyBadger` received a key generation message with an invalid signature. InvalidKeyGenMessageSignature, - /// `DynamicHoneyBadger` received a key generation message with an invalid epoch. - InvalidKeyGenMessageEpoch, + /// `DynamicHoneyBadger` received a key generation message with an invalid era. + InvalidKeyGenMessageEra, /// `DynamicHoneyBadger` received a key generation message when there was no key generation in /// progress. UnexpectedKeyGenMessage, @@ -61,6 +63,8 @@ pub enum FaultKind { InvalidVoteSignature, /// A validator committed an invalid vote in `DynamicHoneyBadger`. InvalidCommittedVote, + /// `DynamicHoneyBadger` received a message with an invalid era. + UnexpectedDhbMessageEra, /// `BinaryAgreement` received a duplicate `BVal` message. DuplicateBVal, /// `BinaryAgreement` received a duplicate `Aux` message. diff --git a/src/honey_badger/batch.rs b/src/honey_badger/batch.rs index d6d534f..bc8819f 100644 --- a/src/honey_badger/batch.rs +++ b/src/honey_badger/batch.rs @@ -1,6 +1,6 @@ use std::collections::BTreeMap; -use NodeIdT; +use {Epoched, NodeIdT}; /// A batch of contributions the algorithm has output. #[derive(Clone, Debug)] @@ -9,6 +9,15 @@ pub struct Batch { pub contributions: BTreeMap, } +impl Epoched for Batch { + type Epoch = u64; + + /// Returns the **next** `HoneyBadger` epoch after the sequential epoch of the batch. + fn epoch(&self) -> u64 { + self.epoch + 1 + } +} + impl Batch { /// Returns an iterator over references to all transactions included in the batch. pub fn iter<'a>(&'a self) -> impl Iterator::Item> diff --git a/src/honey_badger/builder.rs b/src/honey_badger/builder.rs index d803ba9..4d2d139 100644 --- a/src/honey_badger/builder.rs +++ b/src/honey_badger/builder.rs @@ -21,7 +21,7 @@ pub struct HoneyBadgerBuilder { /// Start in this epoch. epoch: u64, /// The maximum number of future epochs for which we handle messages simultaneously. - max_future_epochs: usize, + max_future_epochs: u64, /// Random number generator passed on to algorithm instance for signing and encrypting. rng: Box, /// Strategy used to handle the output of the `Subset` algorithm. @@ -73,7 +73,7 @@ where } /// Sets the maximum number of future epochs for which we handle messages simultaneously. - pub fn max_future_epochs(&mut self, max_future_epochs: usize) -> &mut Self { + pub fn max_future_epochs(&mut self, max_future_epochs: u64) -> &mut Self { self.max_future_epochs = max_future_epochs; self } @@ -102,7 +102,6 @@ where has_input: false, epochs: BTreeMap::new(), max_future_epochs: self.max_future_epochs as u64, - incoming_queue: BTreeMap::new(), rng: Box::new(self.rng.sub_rng()), subset_handling_strategy: self.subset_handling_strategy.clone(), encryption_schedule: self.encryption_schedule, diff --git a/src/honey_badger/honey_badger.rs b/src/honey_badger/honey_badger.rs index 87aa426..595045c 100644 --- a/src/honey_badger/honey_badger.rs +++ b/src/honey_badger/honey_badger.rs @@ -3,12 +3,13 @@ use std::collections::BTreeMap; use std::sync::Arc; use derivative::Derivative; +use log::debug; use rand::{Rand, Rng}; use serde::{de::DeserializeOwned, Serialize}; use super::epoch_state::EpochState; -use super::{Batch, Error, ErrorKind, HoneyBadgerBuilder, Message, MessageContent, Result}; -use {util, Contribution, DistAlgorithm, NetworkInfo, NodeIdT}; +use super::{Batch, Error, ErrorKind, HoneyBadgerBuilder, Message, Result}; +use {util, Contribution, DistAlgorithm, Epoched, Fault, FaultKind, NetworkInfo, NodeIdT}; pub use super::epoch_state::SubsetHandlingStrategy; use threshold_decryption::EncryptionSchedule; @@ -30,8 +31,6 @@ pub struct HoneyBadger { pub(super) epochs: BTreeMap>, /// The maximum number of `Subset` instances that we run simultaneously. pub(super) max_future_epochs: u64, - /// Messages for future epochs that couldn't be handled yet. - pub(super) incoming_queue: BTreeMap)>>, /// A random number generator used for secret key generation. // Boxed to avoid overloading the algorithm's type with more generics. #[derivative(Debug(format_with = "util::fmt_rng"))] @@ -42,6 +41,14 @@ pub struct HoneyBadger { pub(super) encryption_schedule: EncryptionSchedule, } +impl Epoched for HoneyBadger { + type Epoch = u64; + + fn epoch(&self) -> Self::Epoch { + self.epoch + } +} + pub type Step = ::Step>; impl DistAlgorithm for HoneyBadger @@ -116,19 +123,17 @@ where return Err(ErrorKind::UnknownSender.into()); } let Message { epoch, content } = message; - if epoch > self.epoch + self.max_future_epochs { - // Postpone handling this message. - self.incoming_queue - .entry(epoch) - .or_insert_with(Vec::new) - .push((sender_id.clone(), content)); - } else if self.epoch <= epoch { + if self.epoch <= epoch && epoch <= self.epoch + self.max_future_epochs { let step = self .epoch_state_mut(epoch)? .handle_message_content(sender_id, content)?; - return Ok(step.join(self.try_output_batches()?)); - } // And ignore all messages from past epochs. - Ok(Step::default()) + Ok(step.join(self.try_output_batches()?)) + } else if epoch > self.epoch + self.max_future_epochs { + Ok(Fault::new(sender_id.clone(), FaultKind::UnexpectedHbMessageEpoch).into()) + } else { + // The message is late; discard it. + Ok(Step::default()) + } } /// Returns `true` if input for the current epoch has already been provided. @@ -149,20 +154,12 @@ where } /// Increments the epoch number and clears any state that is local to the finished epoch. - fn update_epoch(&mut self) -> Result> { + fn update_epoch(&mut self) { // Clear the state of the old epoch. self.epochs.remove(&self.epoch); self.epoch += 1; self.has_input = false; - let max_epoch = self.epoch + self.max_future_epochs; - let mut step = Step::default(); - if let Some(messages) = self.incoming_queue.remove(&max_epoch) { - let epoch_state = self.epoch_state_mut(max_epoch)?; - for (sender_id, content) in messages { - step.extend(epoch_state.handle_message_content(&sender_id, content)?); - } - } - Ok(step) + debug!("Started epoch {:?}", self.epoch); } /// Tries to decrypt contributions from all proposers and output those in a batch. @@ -176,7 +173,7 @@ where // Queue the output and advance the epoch. step.output.push(batch); step.fault_log.extend(fault_log); - step.extend(self.update_epoch()?); + self.update_epoch(); } Ok(step) } @@ -195,4 +192,9 @@ where )?), }) } + + /// Returns the maximum future epochs of the Honey Badger algorithm instance. + pub fn max_future_epochs(&self) -> u64 { + self.max_future_epochs + } } diff --git a/src/honey_badger/message.rs b/src/honey_badger/message.rs index e27bf05..bf0d5da 100644 --- a/src/honey_badger/message.rs +++ b/src/honey_badger/message.rs @@ -5,6 +5,8 @@ use serde_derive::{Deserialize, Serialize}; use subset; use threshold_decryption; +use Epoched; + /// The content of a `HoneyBadger` message. It should be further annotated with an epoch. #[derive(Clone, Debug, Deserialize, Rand, Serialize)] pub enum MessageContent { @@ -33,8 +35,10 @@ pub struct Message { pub(super) content: MessageContent, } -impl Message { - pub fn epoch(&self) -> u64 { +impl Epoched for Message { + type Epoch = u64; + + fn epoch(&self) -> u64 { self.epoch } } diff --git a/src/honey_badger/mod.rs b/src/honey_badger/mod.rs index 87d6cc7..5d59123 100644 --- a/src/honey_badger/mod.rs +++ b/src/honey_badger/mod.rs @@ -29,6 +29,8 @@ mod error; mod honey_badger; mod message; +pub mod sender_queueable; + pub use self::batch::Batch; pub use self::builder::HoneyBadgerBuilder; pub use self::error::{Error, ErrorKind, Result}; diff --git a/src/honey_badger/sender_queueable.rs b/src/honey_badger/sender_queueable.rs new file mode 100644 index 0000000..9fca529 --- /dev/null +++ b/src/honey_badger/sender_queueable.rs @@ -0,0 +1,53 @@ +use rand::Rand; +use serde::{de::DeserializeOwned, Serialize}; + +use super::{Batch, HoneyBadger, Message}; +use sender_queue::{ + SenderQueueableDistAlgorithm, SenderQueueableEpoch, SenderQueueableMessage, + SenderQueueableOutput, +}; +use {Contribution, Epoched, NodeIdT}; + +impl SenderQueueableOutput> for Batch +where + C: Contribution, + N: NodeIdT + Rand, +{ + fn added_node(&self) -> Option { + None + } + + fn convert_epoch(&self) -> u64 { + self.epoch() + } +} + +impl SenderQueueableMessage for Message +where + N: Rand, +{ + fn is_accepted(&self, them: u64, max_future_epochs: u64) -> bool { + let our_epoch = self.epoch(); + them <= our_epoch && our_epoch <= them + max_future_epochs + } + + fn is_obsolete(&self, them: u64) -> bool { + self.epoch() < them + } +} + +impl SenderQueueableEpoch for u64 { + fn spanning_epochs(&self) -> Vec { + vec![] + } +} + +impl SenderQueueableDistAlgorithm for HoneyBadger +where + C: Contribution + Serialize + DeserializeOwned, + N: NodeIdT + Rand, +{ + fn max_future_epochs(&self) -> u64 { + self.max_future_epochs() + } +} diff --git a/src/lib.rs b/src/lib.rs index bfcc284..6cdaa4d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -141,6 +141,7 @@ pub mod broadcast; pub mod dynamic_honey_badger; pub mod honey_badger; pub mod queueing_honey_badger; +pub mod sender_queue; pub mod subset; pub mod sync_key_gen; pub mod threshold_decryption; @@ -152,4 +153,4 @@ pub use crypto::pairing; pub use fault_log::{Fault, FaultKind, FaultLog}; pub use messaging::{SourcedMessage, Target, TargetedMessage}; pub use network_info::NetworkInfo; -pub use traits::{Contribution, DistAlgorithm, Message, NodeIdT, SessionIdT, Step}; +pub use traits::{Contribution, DistAlgorithm, Epoched, Message, NodeIdT, SessionIdT, Step}; diff --git a/src/queueing_honey_badger.rs b/src/queueing_honey_badger/mod.rs similarity index 97% rename from src/queueing_honey_badger.rs rename to src/queueing_honey_badger/mod.rs index fba30ae..3faa336 100644 --- a/src/queueing_honey_badger.rs +++ b/src/queueing_honey_badger/mod.rs @@ -22,6 +22,8 @@ //! entries, any two nodes will likely make almost disjoint contributions instead of proposing //! the same transaction multiple times. +pub mod sender_queueable; + use std::fmt::{self, Display}; use std::marker::PhantomData; use std::{cmp, iter}; @@ -35,7 +37,7 @@ use dynamic_honey_badger::{self, Batch as DhbBatch, DynamicHoneyBadger, Message} use transaction_queue::TransactionQueue; use {util, Contribution, DistAlgorithm, NodeIdT}; -pub use dynamic_honey_badger::{Change, ChangeState, Input, NodeChange}; +pub use dynamic_honey_badger::{Change, ChangeState, Epoch, Input, NodeChange}; /// Queueing honey badger error variants. #[derive(Debug, Fail)] @@ -177,7 +179,7 @@ where pub struct QueueingHoneyBadger { /// The target number of transactions to be included in each batch. batch_size: usize, - /// The internal `DynamicHoneyBadger` instance. + /// The internal managed `DynamicHoneyBadger` instance. dyn_hb: DynamicHoneyBadger, N>, /// The queue of pending transactions that haven't been output in a batch yet. queue: Q, @@ -275,7 +277,7 @@ where Ok(step.join(self.propose()?)) } - /// Returns a reference to the internal `DynamicHoneyBadger` instance. + /// Returns a reference to the internal managed `DynamicHoneyBadger` instance. pub fn dyn_hb(&self) -> &DynamicHoneyBadger, N> { &self.dyn_hb } diff --git a/src/queueing_honey_badger/sender_queueable.rs b/src/queueing_honey_badger/sender_queueable.rs new file mode 100644 index 0000000..c9791c4 --- /dev/null +++ b/src/queueing_honey_badger/sender_queueable.rs @@ -0,0 +1,18 @@ +use rand::Rand; +use serde::{de::DeserializeOwned, Serialize}; + +use super::QueueingHoneyBadger; +use sender_queue::SenderQueueableDistAlgorithm; +use transaction_queue::TransactionQueue; +use {Contribution, NodeIdT}; + +impl SenderQueueableDistAlgorithm for QueueingHoneyBadger +where + T: Contribution + Serialize + DeserializeOwned + Clone, + N: NodeIdT + Serialize + DeserializeOwned + Rand, + Q: TransactionQueue, +{ + fn max_future_epochs(&self) -> u64 { + self.dyn_hb.max_future_epochs() + } +} diff --git a/src/sender_queue/message.rs b/src/sender_queue/message.rs new file mode 100644 index 0000000..6d98f98 --- /dev/null +++ b/src/sender_queue/message.rs @@ -0,0 +1,46 @@ +use rand::{Rand, Rng}; +use serde_derive::{Deserialize, Serialize}; + +use Epoched; + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub enum Message { + EpochStarted(::Epoch), + Algo(M), +} + +impl Rand for Message +where + M: Epoched + Rand, + ::Epoch: Rand, +{ + fn rand(rng: &mut R) -> Self { + let message_type = *rng.choose(&["epoch", "algo"]).unwrap(); + + match message_type { + "epoch" => Message::EpochStarted(rng.gen()), + "algo" => Message::Algo(rng.gen()), + _ => unreachable!(), + } + } +} + +impl Epoched for Message +where + M: Epoched, +{ + type Epoch = ::Epoch; + + fn epoch(&self) -> Self::Epoch { + match self { + Message::EpochStarted(epoch) => *epoch, + Message::Algo(message) => message.epoch(), + } + } +} + +impl From for Message { + fn from(message: M) -> Self { + Message::Algo(message) + } +} diff --git a/src/sender_queue/mod.rs b/src/sender_queue/mod.rs new file mode 100644 index 0000000..ac85821 --- /dev/null +++ b/src/sender_queue/mod.rs @@ -0,0 +1,361 @@ +//! # Sender queue +//! +//! A sender queue allows a `DistAlgorithm` that outputs `Epoched` messages to buffer those outgoing +//! messages based on their epochs. A message is sent to its recipient only when the recipient's +//! epoch matches the epoch of the message. Thus no queueing is required for incoming messages since +//! any incoming messages with non-matching epochs can be safely discarded. + +mod message; + +use std::collections::BTreeMap; +use std::fmt::Debug; + +use rand::Rand; +use serde::{de::DeserializeOwned, Serialize}; + +use {DistAlgorithm, Epoched, NodeIdT, Target}; + +pub use self::message::Message; + +pub trait SenderQueueableMessage: Epoched { + /// Whether the message is accepted in epoch `them`. + fn is_accepted(&self, them: ::Epoch, max_future_epochs: u64) -> bool; + + /// Whether the epoch of the message is behind `them`. + fn is_obsolete(&self, them: ::Epoch) -> bool; +} + +pub trait SenderQueueableOutput: Epoched +where + N: NodeIdT, + M: Epoched, +{ + /// Returns an optional new node added with the batch. This node should be added to the set of + /// all nodes. + fn added_node(&self) -> Option; + + /// Performs type conversion of the batch epoch info a fixed epoch type. + fn convert_epoch(&self) -> ::Epoch; +} + +pub trait SenderQueueableEpoch +where + Self: Sized, +{ + /// A _spanning epoch_ of an epoch `e` is an epoch `e0` such that + /// + /// - `e` and `e0` are incomparable by the partial ordering on epochs and + /// + /// - the duration of `e0` is at least that of `e`. + /// + /// Returned is a list of spanning epochs for the given epoch. + /// + /// For example, any `DynamicHoneyBadger` epoch `Epoch((x, Some(y)))` has a unique spanning + /// epoch `Epoch((x, None))`. In turn, no epoch `Epoch((x, None))` has a spanning epoch. + fn spanning_epochs(&self) -> Vec; +} + +pub trait SenderQueueableDistAlgorithm +where + Self: DistAlgorithm, +{ + /// The maximum number of subsequent future epochs that the `DistAlgorithm` is allowed to handle + /// messages for. + fn max_future_epochs(&self) -> u64; +} + +pub type OutgoingQueue = BTreeMap< + ( + ::NodeId, + <::Message as Epoched>::Epoch, + ), + Vec<::Message>, +>; + +/// An instance of `DistAlgorithm` wrapped with a queue of outgoing messages, that is, a sender +/// queue. This wrapping ensures that the messages sent to remote instances lead to progress of the +/// entire consensus network. In particular, messages to lagging remote nodes are queued and sent +/// only when those nodes' epochs match the queued messages' epochs. Thus all nodes can handle +/// incoming messages without queueing them and can ignore messages whose epochs are not currently +/// acccepted. +#[derive(Debug)] +pub struct SenderQueue +where + D: SenderQueueableDistAlgorithm, + D::Message: Clone + SenderQueueableMessage + Serialize + DeserializeOwned, + D::NodeId: NodeIdT + Rand, + D::Output: SenderQueueableOutput, +{ + /// The managed `DistAlgorithm` instance. + algo: D, + /// Our node ID. + our_id: D::NodeId, + /// Current epoch. + epoch: ::Epoch, + /// Messages that couldn't be handled yet by remote nodes. + outgoing_queue: OutgoingQueue, + /// The set of all remote nodes on the network including validator as well as non-validator + /// (observer) nodes together with their epochs as of the last communication. + peer_epochs: BTreeMap::Epoch>, +} + +pub type Step = ::Step>; + +pub type Result = ::std::result::Result::Error>; + +impl DistAlgorithm for SenderQueue +where + D: SenderQueueableDistAlgorithm + Debug + Send + Sync, + D::Message: Clone + SenderQueueableMessage + Serialize + DeserializeOwned, + D::NodeId: NodeIdT + Rand, + D::Output: SenderQueueableOutput, + ::Epoch: SenderQueueableEpoch, +{ + type NodeId = D::NodeId; + type Input = D::Input; + type Output = D::Output; + type Message = Message; + type Error = D::Error; + + fn handle_input(&mut self, input: Self::Input) -> Result, D> { + self.handle_input(input) + } + + fn handle_message( + &mut self, + sender_id: &D::NodeId, + message: Self::Message, + ) -> Result, D> { + self.handle_message(sender_id, message) + } + + fn terminated(&self) -> bool { + false + } + + fn our_id(&self) -> &D::NodeId { + &self.our_id + } +} + +impl SenderQueue +where + D: SenderQueueableDistAlgorithm + Debug + Send + Sync, + D::Message: Clone + SenderQueueableMessage + Serialize + DeserializeOwned, + D::NodeId: NodeIdT + Rand, + D::Output: SenderQueueableOutput, + ::Epoch: SenderQueueableEpoch, +{ + /// Returns a new `SenderQueueBuilder` configured to manage a given `DynamicHoneyBadger` instance. + pub fn builder(algo: D, peer_ids: I) -> SenderQueueBuilder + where + I: Iterator, + { + SenderQueueBuilder::new(algo, peer_ids) + } + + pub fn handle_input(&mut self, input: D::Input) -> Result, D> { + let mut step = self.algo.handle_input(input)?; + let mut sender_queue_step = self.update_epoch(&step); + self.defer_messages(&mut step); + sender_queue_step.extend(step.map(|output| output, Message::from)); + Ok(sender_queue_step) + } + + pub fn handle_message( + &mut self, + sender_id: &D::NodeId, + message: Message, + ) -> Result, D> { + match message { + Message::EpochStarted(epoch) => Ok(self.handle_epoch_started(sender_id, epoch)), + Message::Algo(msg) => self.handle_message_content(sender_id, msg), + } + } + + /// Handles an epoch start announcement. + fn handle_epoch_started( + &mut self, + sender_id: &D::NodeId, + epoch: ::Epoch, + ) -> Step { + self.peer_epochs + .entry(sender_id.clone()) + .and_modify(|e| { + if *e < epoch { + *e = epoch; + } + }).or_insert(epoch); + self.remove_earlier_messages(sender_id, epoch); + self.process_new_epoch(sender_id, epoch) + } + + /// Removes all messages queued for the remote node from epochs upto `epoch`. + fn remove_earlier_messages( + &mut self, + sender_id: &D::NodeId, + epoch: ::Epoch, + ) { + let earlier_keys: Vec<_> = self + .outgoing_queue + .keys() + .cloned() + .filter(|(id, this_epoch)| id == sender_id && *this_epoch < epoch) + .collect(); + for key in earlier_keys { + self.outgoing_queue.remove(&key); + } + } + + /// Processes an announcement of a new epoch update received from a remote node. + fn process_new_epoch( + &mut self, + sender_id: &D::NodeId, + epoch: ::Epoch, + ) -> Step { + // Send any HB messages for the HB epoch. + let mut ready_messages = self + .outgoing_queue + .remove(&(sender_id.clone(), epoch)) + .unwrap_or_default(); + for u in epoch.spanning_epochs() { + // Send any DHB messages for the DHB era. + ready_messages.extend( + self.outgoing_queue + .remove(&(sender_id.clone(), u)) + .unwrap_or_default(), + ); + } + Step::from( + ready_messages + .into_iter() + .map(|msg| Target::Node(sender_id.clone()).message(Message::Algo(msg))), + ) + } + + /// Handles a Honey Badger algorithm message in a given epoch. + fn handle_message_content( + &mut self, + sender_id: &D::NodeId, + content: D::Message, + ) -> Result, D> { + let mut step = self.algo.handle_message(sender_id, content)?; + let mut sender_queue_step = self.update_epoch(&step); + self.defer_messages(&mut step); + sender_queue_step.extend(step.map(|output| output, Message::from)); + Ok(sender_queue_step) + } + + /// Updates the current Honey Badger epoch. + fn update_epoch(&mut self, step: &::Step) -> Step { + // Look up `DynamicHoneyBadger` epoch updates and collect any added peers. + let new_epoch = step.output.iter().fold(self.epoch, |epoch, batch| { + let max_epoch = epoch.max(batch.convert_epoch()); + if let Some(node) = batch.added_node() { + if &node != self.our_id() { + self.peer_epochs + .entry(node) + .or_insert_with(::Epoch::default); + } + } + max_epoch + }); + if new_epoch != self.epoch { + self.epoch = new_epoch; + // Announce the new epoch. + Target::All + .message(Message::EpochStarted(self.epoch)) + .into() + } else { + Step::default() + } + } + + /// Removes any messages to nodes at earlier epochs from the given `Step`. This may involve + /// decomposing a `Target::All` message into `Target::Node` messages and sending some of the + /// resulting messages while placing onto the queue those remaining messages whose recipient is + /// currently at an earlier epoch. + fn defer_messages(&mut self, step: &mut ::Step) { + let max_future_epochs = self.algo.max_future_epochs(); + // Append the deferred messages onto the queues. + for (id, message) in step.defer_messages(&self.peer_epochs, max_future_epochs) { + let epoch = message.epoch(); + self.outgoing_queue + .entry((id, epoch)) + .or_insert_with(Vec::new) + .push(message); + } + } + + /// Returns a reference to the managed algorithm. + pub fn algo(&self) -> &D { + &self.algo + } +} + +/// A builder of a Honey Badger with a sender queue. It configures the parameters and creates a new +/// instance of `SenderQueue`. +pub struct SenderQueueBuilder +where + D: SenderQueueableDistAlgorithm, + D::Message: Epoched, +{ + algo: D, + epoch: ::Epoch, + outgoing_queue: OutgoingQueue, + peer_epochs: BTreeMap::Epoch>, +} + +impl SenderQueueBuilder +where + D: SenderQueueableDistAlgorithm + Debug + Send + Sync, + D::Message: Clone + SenderQueueableMessage + Serialize + DeserializeOwned, + D::NodeId: NodeIdT + Rand, + D::Output: SenderQueueableOutput, + ::Epoch: SenderQueueableEpoch, +{ + pub fn new(algo: D, peer_ids: I) -> Self + where + I: Iterator, + { + SenderQueueBuilder { + algo, + epoch: ::Epoch::default(), + outgoing_queue: BTreeMap::default(), + peer_epochs: peer_ids + .map(|id| (id, ::Epoch::default())) + .collect(), + } + } + + pub fn epoch(mut self, epoch: ::Epoch) -> Self { + self.epoch = epoch; + self + } + + pub fn outgoing_queue(mut self, outgoing_queue: OutgoingQueue) -> Self { + self.outgoing_queue = outgoing_queue; + self + } + + pub fn peer_epochs( + mut self, + peer_epochs: BTreeMap::Epoch>, + ) -> Self { + self.peer_epochs = peer_epochs; + self + } + + pub fn build(self, our_id: D::NodeId) -> (SenderQueue, Step) { + let epoch = ::Epoch::default(); + let sq = SenderQueue { + algo: self.algo, + our_id, + epoch: self.epoch, + outgoing_queue: self.outgoing_queue, + peer_epochs: self.peer_epochs, + }; + let step: Step = Target::All.message(Message::EpochStarted(epoch)).into(); + (sq, step) + } +} diff --git a/src/traits.rs b/src/traits.rs index 56bffe4..021f781 100644 --- a/src/traits.rs +++ b/src/traits.rs @@ -1,14 +1,17 @@ //! Common supertraits for distributed algorithms. +use std::collections::{BTreeMap, BTreeSet}; use std::fmt::{Debug, Display}; use std::hash::Hash; use std::iter::once; use failure::Fail; -use serde::Serialize; +use rand::Rand; +use serde::{de::DeserializeOwned, Serialize}; use fault_log::{Fault, FaultLog}; -use TargetedMessage; +use sender_queue::SenderQueueableMessage; +use {Target, TargetedMessage}; /// A transaction, user message, or other user data. pub trait Contribution: Eq + Debug + Hash + Send + Sync {} @@ -183,6 +186,120 @@ impl From> for Step } } +impl From for Step +where + D: DistAlgorithm, + I: IntoIterator>, +{ + fn from(msgs: I) -> Self { + Step { + messages: msgs.into_iter().collect(), + ..Step::default() + } + } +} + +/// An interface to objects with epoch numbers. Different algorithms may have different internal +/// notion of _epoch_. This interface summarizes the properties that are essential for the message +/// sender queue. +pub trait Epoched { + type Epoch: Copy + Message + Default + Eq + Ord + Serialize + DeserializeOwned; + + /// Returns the object's epoch number. + fn epoch(&self) -> Self::Epoch; +} + +impl Epoched for TargetedMessage { + type Epoch = ::Epoch; + + fn epoch(&self) -> Self::Epoch { + self.message.epoch() + } +} + +impl<'i, D> Step +where + D: DistAlgorithm, + ::NodeId: NodeIdT + Rand, + ::Message: + 'i + Clone + SenderQueueableMessage + Serialize + DeserializeOwned, + ::Output: Epoched, +{ + /// Removes and returns any messages that are not yet accepted by remote nodes according to the + /// mapping `remote_epochs`. This way the returned messages are postponed until later, and the + /// remaining messages can be sent to remote nodes without delay. + pub fn defer_messages( + &mut self, + peer_epochs: &'i BTreeMap::Epoch>, + max_future_epochs: u64, + ) -> Vec<(D::NodeId, D::Message)> + where + ::NodeId: 'i, + { + let messages = &mut self.messages; + let (mut passed_msgs, failed_msgs): (Vec<_>, Vec<_>) = + messages + .drain(..) + .partition(|TargetedMessage { target, message }| match target { + Target::All => peer_epochs + .values() + .all(|&them| message.is_accepted(them, max_future_epochs)), + Target::Node(id) => peer_epochs + .get(&id) + .map_or(false, |&them| message.is_accepted(them, max_future_epochs)), + }); + // `Target::All` messages contained in the result of the partitioning are analyzed further + // and each split into two sets of point messages: those which can be sent without delay and + // those which should be postponed. + let remote_nodes: BTreeSet<&D::NodeId> = peer_epochs.keys().collect(); + let mut deferred_msgs: Vec<(D::NodeId, D::Message)> = Vec::new(); + for msg in failed_msgs { + let m = msg.message; + match msg.target { + Target::Node(id) => { + let defer = { + let lagging = |&them| { + !(m.is_accepted(them, max_future_epochs) || m.is_obsolete(them)) + }; + peer_epochs.get(&id).map_or(true, lagging) + }; + if defer { + deferred_msgs.push((id, m)); + } + } + Target::All => { + let isnt_earlier_epoch = + |&them| m.is_accepted(them, max_future_epochs) || m.is_obsolete(them); + let lagging = |them| !isnt_earlier_epoch(them); + let accepts = |&them| m.is_accepted(them, max_future_epochs); + let accepting_nodes: BTreeSet<&D::NodeId> = peer_epochs + .iter() + .filter(|(_, them)| accepts(them)) + .map(|(id, _)| id) + .collect(); + let non_lagging_nodes: BTreeSet<&D::NodeId> = peer_epochs + .iter() + .filter(|(_, them)| isnt_earlier_epoch(them)) + .map(|(id, _)| id) + .collect(); + for &id in &accepting_nodes { + passed_msgs.push(Target::Node(id.clone()).message(m.clone())); + } + let lagging_nodes: BTreeSet<_> = + remote_nodes.difference(&non_lagging_nodes).collect(); + for &id in lagging_nodes { + if peer_epochs.get(id).map_or(true, lagging) { + deferred_msgs.push((id.clone(), m.clone())); + } + } + } + } + } + messages.extend(passed_msgs); + deferred_msgs + } +} + /// A distributed algorithm that defines a message flow. pub trait DistAlgorithm: Send + Sync { /// Unique node identifier. diff --git a/tests/dynamic_honey_badger.rs b/tests/dynamic_honey_badger.rs index 63ad8d4..cab0cbb 100644 --- a/tests/dynamic_honey_badger.rs +++ b/tests/dynamic_honey_badger.rs @@ -13,6 +13,7 @@ extern crate threshold_crypto as crypto; mod network; use std::collections::BTreeMap; +use std::iter; use std::sync::Arc; use itertools::Itertools; @@ -22,12 +23,13 @@ use rand::{Isaac64Rng, Rng}; use hbbft::dynamic_honey_badger::{ Batch, Change, ChangeState, DynamicHoneyBadger, Input, NodeChange, }; +use hbbft::sender_queue::{SenderQueue, Step}; use hbbft::transaction_queue::TransactionQueue; use hbbft::NetworkInfo; use network::{Adversary, MessageScheduler, NodeId, SilentAdversary, TestNetwork, TestNode}; -type UsizeDhb = DynamicHoneyBadger, NodeId>; +type UsizeDhb = SenderQueue, NodeId>>; /// Proposes `num_txs` values and expects nodes to output and order them. fn test_dynamic_honey_badger(mut network: TestNetwork, num_txs: usize) @@ -80,8 +82,8 @@ where .iter() .filter(|(_, node)| { node_busy(*node) - && !node.instance().has_input() - && node.instance().netinfo().is_validator() + && !node.instance().algo().has_input() + && node.instance().algo().netinfo().is_validator() // Wait until all nodes have completed removing 0, before inputting `Add`. && (input_add || !has_remove(node)) // If there's only one node, it will immediately output on input. Make sure we @@ -99,6 +101,7 @@ where if !input_add && network.nodes.values().all(has_remove) { let pk = network.nodes[&NodeId(0)] .instance() + .algo() .netinfo() .secret_key() .public_key(); @@ -114,8 +117,20 @@ where // Allow passing `netinfo` by value. `TestNetwork` expects this function signature. #[cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))] -fn new_dynamic_hb(netinfo: Arc>) -> UsizeDhb { - DynamicHoneyBadger::builder().build((*netinfo).clone()) +fn new_dynamic_hb( + netinfo: Arc>, +) -> (UsizeDhb, Step, NodeId>>) { + let observer = NodeId(netinfo.num_nodes()); + let our_id = *netinfo.our_id(); + let peer_ids = netinfo + .all_ids() + .filter(|&&them| them != our_id) + .cloned() + .chain(iter::once(observer)); + SenderQueue::builder( + DynamicHoneyBadger::builder().build((*netinfo).clone()), + peer_ids, + ).build(our_id) } fn test_dynamic_honey_badger_different_sizes(new_adversary: F, num_txs: usize) @@ -137,7 +152,8 @@ where num_good_nodes, num_adv_nodes ); let adversary = |adv_nodes| new_adversary(num_good_nodes, num_adv_nodes, adv_nodes); - let network = TestNetwork::new(num_good_nodes, num_adv_nodes, adversary, new_dynamic_hb); + let network = + TestNetwork::new_with_step(num_good_nodes, num_adv_nodes, adversary, new_dynamic_hb); test_dynamic_honey_badger(network, num_txs); } } diff --git a/tests/honey_badger.rs b/tests/honey_badger.rs index 1ca34e4..872b7a2 100644 --- a/tests/honey_badger.rs +++ b/tests/honey_badger.rs @@ -14,22 +14,24 @@ extern crate threshold_crypto as crypto; mod network; use std::collections::BTreeMap; +use std::iter; use std::sync::Arc; use itertools::Itertools; use log::info; use rand::Rng; -use hbbft::honey_badger::{self, Batch, HoneyBadger, MessageContent}; +use hbbft::honey_badger::{Batch, HoneyBadger, MessageContent}; +use hbbft::sender_queue::{self, SenderQueue, Step}; use hbbft::transaction_queue::TransactionQueue; -use hbbft::{threshold_decryption, NetworkInfo, Target, TargetedMessage}; +use hbbft::{threshold_decryption, DistAlgorithm, Epoched, NetworkInfo, Target, TargetedMessage}; use network::{ Adversary, MessageScheduler, MessageWithSender, NodeId, RandomAdversary, SilentAdversary, TestNetwork, TestNode, }; -type UsizeHoneyBadger = HoneyBadger, NodeId>; +type UsizeHoneyBadger = SenderQueue, NodeId>>; /// An adversary whose nodes only send messages with incorrect decryption shares. pub struct FaultyShareAdversary { @@ -66,7 +68,7 @@ impl Adversary for FaultyShareAdversary { fn push_message( &mut self, sender_id: NodeId, - msg: TargetedMessage, NodeId>, + msg: TargetedMessage<::Message, NodeId>, ) { let NodeId(sender_id) = sender_id; if sender_id < self.num_good { @@ -105,12 +107,12 @@ impl Adversary for FaultyShareAdversary { for proposer_id in 0..self.num_good + self.num_adv { outgoing.push(MessageWithSender::new( NodeId(sender_id), - Target::All.message( + Target::All.message(sender_queue::Message::Algo( MessageContent::DecryptionShare { proposer_id: NodeId(proposer_id), share: threshold_decryption::Message(share.clone()), }.with_epoch(*epoch), - ), + )), )) } } @@ -142,7 +144,7 @@ where let input_ids: Vec<_> = network .nodes .iter() - .filter(|(_, node)| !node.instance().has_input()) + .filter(|(_, node)| !node.instance().algo().has_input()) .map(|(id, _)| *id) .collect(); if let Some(id) = rng.choose(&input_ids) { @@ -181,8 +183,18 @@ where } } -fn new_honey_badger(netinfo: Arc>) -> UsizeHoneyBadger { - HoneyBadger::builder(netinfo).build() +fn new_honey_badger( + netinfo: Arc>, +) -> (UsizeHoneyBadger, Step, NodeId>>) { + let our_id = *netinfo.our_id(); + let observer = NodeId(netinfo.num_nodes()); + let nc = netinfo.clone(); + let peer_ids = nc + .all_ids() + .filter(|&&them| them != our_id) + .cloned() + .chain(iter::once(observer)); + SenderQueue::builder(HoneyBadger::builder(netinfo).build(), peer_ids).build(our_id) } fn test_honey_badger_different_sizes(new_adversary: F, num_txs: usize) @@ -203,7 +215,8 @@ where num_good_nodes, num_adv_nodes ); let adversary = |adv_nodes| new_adversary(num_good_nodes, num_adv_nodes, adv_nodes); - let network = TestNetwork::new(num_good_nodes, num_adv_nodes, adversary, new_honey_badger); + let network = + TestNetwork::new_with_step(num_good_nodes, num_adv_nodes, adversary, new_honey_badger); test_honey_badger(network, num_txs); } } diff --git a/tests/net_dynamic_hb.rs b/tests/net_dynamic_hb.rs index 91496a5..29f8e69 100644 --- a/tests/net_dynamic_hb.rs +++ b/tests/net_dynamic_hb.rs @@ -10,10 +10,11 @@ pub mod net; use std::{collections, time}; use hbbft::dynamic_honey_badger::{Change, ChangeState, DynamicHoneyBadger, Input, NodeChange}; -use hbbft::DistAlgorithm; +use hbbft::sender_queue::SenderQueue; +use hbbft::Epoched; use net::adversary::ReorderingAdversary; use net::proptest::{gen_seed, NetworkDimension, TestRng, TestRngSeed}; -use net::NetBuilder; +use net::{NetBuilder, NewNodeInfo}; use proptest::{prelude::ProptestConfig, prop_compose, proptest, proptest_helper}; use rand::{Rng, SeedableRng}; @@ -102,11 +103,16 @@ fn do_drop_and_readd(cfg: TestConfig) { // Ensure runs are reproducible. .rng(rng.gen::()) .adversary(ReorderingAdversary::new(rng.gen::())) - .using(move |node| { - println!("Constructing new dynamic honey badger node #{}", node.id); - DynamicHoneyBadger::builder() + .using_step(move |node: NewNodeInfo>| { + let id = node.id; + println!("Constructing new dynamic honey badger node #{}", id); + let dhb = DynamicHoneyBadger::builder() .rng(node.rng) - .build(node.netinfo) + .build(node.netinfo.clone()); + SenderQueue::builder( + dhb, + node.netinfo.all_ids().filter(|&&them| them != id).cloned(), + ).build(node.id) }).build() .expect("could not construct test network"); @@ -166,6 +172,7 @@ fn do_drop_and_readd(cfg: TestConfig) { // Now we can add the node again. Public keys will be reused. let pk = net[*pivot_node_id] .algorithm() + .algo() .netinfo() .secret_key() .public_key(); @@ -205,7 +212,7 @@ fn do_drop_and_readd(cfg: TestConfig) { // Examine potential algorithm output. for batch in step.output { println!( - "Received epoch {} batch on node {:?}.", + "Received epoch {:?} batch on node {:?}.", batch.epoch(), node_id, ); diff --git a/tests/queueing_honey_badger.rs b/tests/queueing_honey_badger.rs index ac486fd..e4b0410 100644 --- a/tests/queueing_honey_badger.rs +++ b/tests/queueing_honey_badger.rs @@ -13,6 +13,7 @@ extern crate threshold_crypto as crypto; mod network; use std::collections::BTreeMap; +use std::iter; use std::sync::Arc; use itertools::Itertools; @@ -21,13 +22,14 @@ use rand::{Isaac64Rng, Rng}; use hbbft::dynamic_honey_badger::DynamicHoneyBadger; use hbbft::queueing_honey_badger::{ - Batch, Change, ChangeState, Input, NodeChange, QueueingHoneyBadger, Step, + Batch, Change, ChangeState, Input, NodeChange, QueueingHoneyBadger, }; +use hbbft::sender_queue::{Message, SenderQueue, Step}; use hbbft::NetworkInfo; use network::{Adversary, MessageScheduler, NodeId, SilentAdversary, TestNetwork, TestNode}; -type QHB = QueueingHoneyBadger>; +type QHB = SenderQueue>>; /// Proposes `num_txs` values and expects nodes to output and order them. fn test_queueing_honey_badger(mut network: TestNetwork, num_txs: usize) @@ -80,6 +82,7 @@ where } let pk = network.nodes[&NodeId(0)] .instance() + .algo() .dyn_hb() .netinfo() .secret_key() @@ -96,12 +99,22 @@ where // Allow passing `netinfo` by value. `TestNetwork` expects this function signature. #[cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))] -fn new_queueing_hb(netinfo: Arc>) -> (QHB, Step>) { - let dyn_hb = DynamicHoneyBadger::builder().build((*netinfo).clone()); +fn new_queueing_hb( + netinfo: Arc>, +) -> (QHB, Step>>) { + let observer = NodeId(netinfo.num_nodes()); + let our_id = *netinfo.our_id(); + let peer_ids = netinfo + .all_ids() + .filter(|&&them| them != our_id) + .cloned() + .chain(iter::once(observer)); + let dhb = DynamicHoneyBadger::builder().build((*netinfo).clone()); let rng = rand::thread_rng().gen::(); - QueueingHoneyBadger::builder(dyn_hb) - .batch_size(3) - .build(rng) + let (qhb, qhb_step) = QueueingHoneyBadger::builder(dhb).batch_size(3).build(rng); + let (sq, mut step) = SenderQueue::builder(qhb, peer_ids).build(our_id); + step.extend_with(qhb_step, Message::from); + (sq, step) } fn test_queueing_honey_badger_different_sizes(new_adversary: F, num_txs: usize)