From 432c865d4b1721ad21bb1d9a1639a931cad5836a Mon Sep 17 00:00:00 2001 From: Andreas Fackler Date: Sat, 21 Jul 2018 10:18:08 +0200 Subject: [PATCH] Handle initial QHB messages, fix DHB. --- examples/simulation.rs | 36 ++++-- src/dynamic_honey_badger/builder.rs | 137 ++++++++++----------- src/dynamic_honey_badger/mod.rs | 179 ++++++++++------------------ src/messaging.rs | 72 +++++++++++ src/queueing_honey_badger.rs | 86 +++++-------- tests/dynamic_honey_badger.rs | 4 +- tests/network/mod.rs | 26 +++- tests/queueing_honey_badger.rs | 13 +- 8 files changed, 285 insertions(+), 268 deletions(-) diff --git a/examples/simulation.rs b/examples/simulation.rs index 154d1dd..0ba2b16 100644 --- a/examples/simulation.rs +++ b/examples/simulation.rs @@ -144,15 +144,33 @@ where D::Message: Serialize + DeserializeOwned, { /// Creates a new test node with the given broadcast instance. - fn new(algo: D, hw_quality: HwQuality) -> TestNode { + fn new((algo, step): (D, Step), hw_quality: HwQuality) -> TestNode { + let out_queue = step + .messages + .into_iter() + .map(|msg| { + let ser_msg = bincode::serialize(&msg.message).expect("serialize"); + TimestampedMessage { + time: Duration::default(), + sender_id: algo.our_id().clone(), + target: msg.target, + message: ser_msg, + } + }) + .collect(); + let outputs = step + .output + .into_iter() + .map(|out| (Duration::default(), out)) + .collect(); let mut node = TestNode { id: algo.our_id().clone(), algo, time: Duration::default(), sent_time: Duration::default(), in_queue: VecDeque::new(), - out_queue: VecDeque::new(), - outputs: Vec::new(), + out_queue, + outputs, message_count: 0, message_size: 0, hw_quality, @@ -184,10 +202,8 @@ where .messages .into_iter() .map(|msg| { - ( - msg.target, - bincode::serialize(&msg.message).expect("serialize"), - ) + let ser_msg = bincode::serialize(&msg.message).expect("serialize"); + (msg.target, ser_msg) }) .collect(); self.time += start.elapsed() * self.hw_quality.cpu_factor / 100; @@ -250,7 +266,7 @@ where hw_quality: HwQuality, ) -> TestNetwork where - F: Fn(NetworkInfo) -> D, + F: Fn(NetworkInfo) -> (D, Step), { let netinfos = NetworkInfo::generate_map((0..(good_num + adv_num)).map(NodeUid)); let new_node = |(uid, netinfo): (NodeUid, NetworkInfo<_>)| { @@ -430,9 +446,7 @@ fn main() { let num_good_nodes = args.flag_n - args.flag_f; let txs = (0..args.flag_txs).map(|_| Transaction::new(args.flag_tx_size)); let new_honey_badger = |netinfo: NetworkInfo| { - let dyn_hb = DynamicHoneyBadger::builder(netinfo) - .build() - .expect("instantiate DynamicHoneyBadger"); + let dyn_hb = DynamicHoneyBadger::builder().build(netinfo); QueueingHoneyBadger::builder(dyn_hb) .batch_size(args.flag_b) .build_with_transactions(txs.clone()) diff --git a/src/dynamic_honey_badger/builder.rs b/src/dynamic_honey_badger/builder.rs index e559ea3..ee67764 100644 --- a/src/dynamic_honey_badger/builder.rs +++ b/src/dynamic_honey_badger/builder.rs @@ -1,4 +1,4 @@ -use std::collections::VecDeque; +use std::default::Default; use std::fmt::Debug; use std::hash::Hash; use std::iter::once; @@ -8,7 +8,7 @@ use std::sync::Arc; use rand::{self, Rand, Rng}; use serde::{Deserialize, Serialize}; -use super::{ChangeState, DynamicHoneyBadger, JoinPlan, MessageQueue, Result, VoteCounter}; +use super::{ChangeState, DynamicHoneyBadger, JoinPlan, Result, Step, VoteCounter}; use crypto::{SecretKey, SecretKeySet, SecretKeyShare}; use honey_badger::HoneyBadger; use messaging::NetworkInfo; @@ -16,15 +16,19 @@ use messaging::NetworkInfo; /// A Dynamic Honey Badger builder, to configure the parameters and create new instances of /// `DynamicHoneyBadger`. pub struct DynamicHoneyBadgerBuilder { - /// Shared network data. - netinfo: NetworkInfo, - /// The epoch at which to join the network. - start_epoch: u64, - /// The current change, for which key generation is beginning at `start_epoch`. - change: ChangeState, /// The maximum number of future epochs for which we handle messages simultaneously. max_future_epochs: usize, - _phantom: PhantomData, + _phantom: PhantomData<(C, NodeUid)>, +} + +impl Default for DynamicHoneyBadgerBuilder { + fn default() -> Self { + // TODO: Use the defaults from `HoneyBadgerBuilder`. + DynamicHoneyBadgerBuilder { + max_future_epochs: 3, + _phantom: PhantomData, + } + } } impl DynamicHoneyBadgerBuilder @@ -34,51 +38,8 @@ where { /// Returns a new `DynamicHoneyBadgerBuilder` configured to use the node IDs and cryptographic /// keys specified by `netinfo`. - pub fn new(netinfo: NetworkInfo) -> Self { - // TODO: Use the defaults from `HoneyBadgerBuilder`. - DynamicHoneyBadgerBuilder { - netinfo, - start_epoch: 0, - change: ChangeState::None, - max_future_epochs: 3, - _phantom: PhantomData, - } - } - - /// Returns a new `DynamicHoneyBadgerBuilder` configured to start a new network as a single - /// validator. - pub fn new_first_node(our_uid: NodeUid) -> Self { - let mut rng = rand::thread_rng(); - let sk_set = SecretKeySet::random(0, &mut rng); - let pk_set = sk_set.public_keys(); - let sks = sk_set.secret_key_share(0); - let sk: SecretKey = rng.gen(); - let pub_keys = once((our_uid.clone(), sk.public_key())).collect(); - let netinfo = NetworkInfo::new(our_uid, sks, pk_set, sk, pub_keys); - DynamicHoneyBadgerBuilder::new(netinfo) - } - - /// Returns a new `DynamicHoneyBadgerBuilder` configured to join the network at the epoch - /// specified in the `JoinPlan`. - pub fn new_joining( - our_uid: NodeUid, - secret_key: SecretKey, - join_plan: JoinPlan, - ) -> Self { - let netinfo = NetworkInfo::new( - our_uid, - SecretKeyShare::default(), // TODO: Should be an option? - join_plan.pub_key_set, - secret_key, - join_plan.pub_keys, - ); - DynamicHoneyBadgerBuilder { - netinfo, - start_epoch: join_plan.epoch, - change: join_plan.change, - max_future_epochs: 3, - _phantom: PhantomData, - } + pub fn new() -> Self { + Self::default() } /// Sets the maximum number of future epochs for which we handle messages simultaneously. @@ -88,26 +49,68 @@ where } /// Creates a new Dynamic Honey Badger instance with an empty buffer. - pub fn build(&self) -> Result> { - let netinfo = Arc::new(self.netinfo.clone()); - let honey_badger = HoneyBadger::builder(netinfo.clone()) + pub fn build(&self, netinfo: NetworkInfo) -> DynamicHoneyBadger { + let arc_netinfo = Arc::new(netinfo.clone()); + let honey_badger = HoneyBadger::builder(arc_netinfo.clone()) .max_future_epochs(self.max_future_epochs) .build(); - let mut dhb = DynamicHoneyBadger { - netinfo: self.netinfo.clone(), + DynamicHoneyBadger { + netinfo, max_future_epochs: self.max_future_epochs, - start_epoch: self.start_epoch, - vote_counter: VoteCounter::new(netinfo, self.start_epoch), + start_epoch: 0, + vote_counter: VoteCounter::new(arc_netinfo, 0), key_gen_msg_buffer: Vec::new(), honey_badger, key_gen: None, incoming_queue: Vec::new(), - messages: MessageQueue(VecDeque::new()), - output: VecDeque::new(), - }; - if let ChangeState::InProgress(ref change) = self.change { - dhb.update_key_gen(self.start_epoch, change)?; } - Ok(dhb) + } + + /// Creates a new `DynamicHoneyBadger` configured to start a new network as a single validator. + pub fn build_first_node(&self, our_uid: NodeUid) -> DynamicHoneyBadger { + let mut rng = rand::thread_rng(); + let sk_set = SecretKeySet::random(0, &mut rng); + let pk_set = sk_set.public_keys(); + let sks = sk_set.secret_key_share(0); + let sk: SecretKey = rng.gen(); + let pub_keys = once((our_uid.clone(), sk.public_key())).collect(); + let netinfo = NetworkInfo::new(our_uid, sks, pk_set, sk, pub_keys); + self.build(netinfo) + } + + /// Creates a new `DynamicHoneyBadger` configured to join the network at the epoch specified in + /// the `JoinPlan`. + pub fn build_joining( + &self, + our_uid: NodeUid, + secret_key: SecretKey, + join_plan: JoinPlan, + ) -> Result<(DynamicHoneyBadger, Step)> { + let netinfo = NetworkInfo::new( + our_uid, + SecretKeyShare::default(), // TODO: Should be an option? + join_plan.pub_key_set, + secret_key, + join_plan.pub_keys, + ); + let arc_netinfo = Arc::new(netinfo.clone()); + let honey_badger = HoneyBadger::builder(arc_netinfo.clone()) + .max_future_epochs(self.max_future_epochs) + .build(); + let mut dhb = DynamicHoneyBadger { + netinfo, + max_future_epochs: self.max_future_epochs, + start_epoch: join_plan.epoch, + vote_counter: VoteCounter::new(arc_netinfo, join_plan.epoch), + key_gen_msg_buffer: Vec::new(), + honey_badger, + key_gen: None, + incoming_queue: Vec::new(), + }; + let step = match join_plan.change { + ChangeState::InProgress(ref change) => dhb.update_key_gen(join_plan.epoch, change)?, + ChangeState::None | ChangeState::Complete(..) => Step::default(), + }; + Ok((dhb, step)) } } diff --git a/src/dynamic_honey_badger/mod.rs b/src/dynamic_honey_badger/mod.rs index 186e7d7..968e95c 100644 --- a/src/dynamic_honey_badger/mod.rs +++ b/src/dynamic_honey_badger/mod.rs @@ -56,7 +56,7 @@ //! majority before that happens, key generation resets again, and is attempted for the new change. use rand::Rand; -use std::collections::{BTreeMap, VecDeque}; +use std::collections::BTreeMap; use std::fmt::Debug; use std::hash::Hash; use std::mem; @@ -66,10 +66,10 @@ use bincode; use serde::{Deserialize, Serialize}; use self::votes::{SignedVote, VoteCounter}; -use crypto::{PublicKey, PublicKeySet, SecretKey, Signature}; +use crypto::{PublicKey, PublicKeySet, Signature}; use fault_log::{FaultKind, FaultLog}; use honey_badger::{self, HoneyBadger, Message as HbMessage}; -use messaging::{self, DistAlgorithm, NetworkInfo, Target, TargetedMessage}; +use messaging::{self, DistAlgorithm, NetworkInfo, Target}; use sync_key_gen::{Ack, Part, PartOutcome, SyncKeyGen}; pub use self::batch::Batch; @@ -110,10 +110,6 @@ pub struct DynamicHoneyBadger { key_gen: Option<(SyncKeyGen, Change)>, /// A queue for messages from future epochs that cannot be handled yet. incoming_queue: Vec<(NodeUid, Message)>, - /// The messages that need to be sent to other nodes. - messages: MessageQueue, - /// The outputs from completed epochs. - output: VecDeque>, } pub type Step = messaging::Step>; @@ -132,11 +128,10 @@ where fn input(&mut self, input: Self::Input) -> Result> { // User contributions are forwarded to `HoneyBadger` right away. Votes are signed and // broadcast. - let fault_log = match input { - Input::User(contrib) => self.propose(contrib)?, - Input::Change(change) => self.vote_for(change).map(|()| FaultLog::new())?, - }; - self.step(fault_log) + match input { + Input::User(contrib) => self.propose(contrib), + Input::Change(change) => self.vote_for(change), + } } fn handle_message( @@ -145,28 +140,29 @@ where message: Self::Message, ) -> Result> { let epoch = message.start_epoch(); - let fault_log = if epoch < self.start_epoch { + if epoch < self.start_epoch { // Obsolete message. - FaultLog::new() + Ok(Step::default()) } else if epoch > self.start_epoch { // Message cannot be handled yet. Save it for later. let entry = (sender_id.clone(), message); self.incoming_queue.push(entry); - FaultLog::new() + Ok(Step::default()) } else { match message { Message::HoneyBadger(_, hb_msg) => { - self.handle_honey_badger_message(sender_id, hb_msg)? + self.handle_honey_badger_message(sender_id, hb_msg) } Message::KeyGen(_, kg_msg, sig) => { - self.handle_key_gen_message(sender_id, kg_msg, *sig)? - } - Message::SignedVote(signed_vote) => { - self.vote_counter.add_pending_vote(sender_id, signed_vote)? + self.handle_key_gen_message(sender_id, kg_msg, *sig)?; + Ok(Step::default()) } + Message::SignedVote(signed_vote) => self + .vote_counter + .add_pending_vote(sender_id, signed_vote) + .map(FaultLog::into), } - }; - self.step(fault_log) + } } fn terminated(&self) -> bool { @@ -183,34 +179,9 @@ where C: Eq + Serialize + for<'r> Deserialize<'r> + Debug + Hash, NodeUid: Eq + Ord + Clone + Debug + Serialize + for<'r> Deserialize<'r> + Hash + Rand, { - fn step(&mut self, fault_log: FaultLog) -> Result> { - Ok(Step::new( - self.output.drain(..).collect(), - fault_log, - self.messages.drain(..).collect(), - )) - } - - /// Returns a new `DynamicHoneyBadgerBuilder` configured to use the node IDs and cryptographic - /// keys specified by `netinfo`. - pub fn builder(netinfo: NetworkInfo) -> DynamicHoneyBadgerBuilder { - DynamicHoneyBadgerBuilder::new(netinfo) - } - - /// Returns a new `DynamicHoneyBadgerBuilder` configured to start a new network as the first - /// node. - pub fn first_node_builder(our_uid: NodeUid) -> DynamicHoneyBadgerBuilder { - DynamicHoneyBadgerBuilder::new_first_node(our_uid) - } - - /// Returns a new `DynamicHoneyBadgerBuilder` configured to join the network at the epoch - /// specified in the `JoinPlan`. - pub fn joining_builder( - our_uid: NodeUid, - secret_key: SecretKey, - join_plan: JoinPlan, - ) -> DynamicHoneyBadgerBuilder { - DynamicHoneyBadgerBuilder::new_joining(our_uid, secret_key, join_plan) + /// Returns a new `DynamicHoneyBadgerBuilder`. + pub fn builder() -> DynamicHoneyBadgerBuilder { + DynamicHoneyBadgerBuilder::new() } /// Returns `true` if input for the current epoch has already been provided. @@ -219,7 +190,7 @@ where } /// Proposes a contribution in the current epoch. - pub fn propose(&mut self, contrib: C) -> Result> { + pub fn propose(&mut self, contrib: C) -> Result> { let step = self.honey_badger.input(InternalContrib { contrib, key_gen_messages: self.key_gen_msg_buffer.clone(), @@ -229,14 +200,13 @@ where } /// Cast a vote to change the set of validators. - pub fn vote_for(&mut self, change: Change) -> Result<()> { + pub fn vote_for(&mut self, change: Change) -> Result> { if !self.netinfo.is_validator() { - return Ok(()); // TODO: Return an error? + return Ok(Step::default()); // TODO: Return an error? } let signed_vote = self.vote_counter.sign_vote_for(change)?.clone(); let msg = Message::SignedVote(signed_vote); - self.messages.push_back(Target::All.message(msg)); - Ok(()) + Ok(Target::All.message(msg).into()) } /// Returns the information about the node IDs in the network, and the cryptographic keys. @@ -249,12 +219,12 @@ where &mut self, sender_id: &NodeUid, message: HbMessage, - ) -> Result> { + ) -> Result> { if !self.netinfo.is_node_validator(sender_id) { info!("Unknown sender {:?} of message {:?}", sender_id, message); return Err(ErrorKind::UnknownSender.into()); } - // Handle the message and put the outgoing messages into the queue. + // Handle the message. let step = self.honey_badger.handle_message(sender_id, message)?; self.process_output(step) } @@ -266,22 +236,22 @@ where sender_id: &NodeUid, kg_msg: KeyGenMessage, sig: Signature, - ) -> Result> { + ) -> Result<()> { self.verify_signature(sender_id, &sig, &kg_msg)?; let tx = SignedKeyGenMsg(self.start_epoch, sender_id.clone(), kg_msg, sig); self.key_gen_msg_buffer.push(tx); - Ok(FaultLog::default()) + Ok(()) } /// Processes all pending batches output by Honey Badger. fn process_output( &mut self, - step: honey_badger::Step, NodeUid>, - ) -> Result> { - let mut fault_log = FaultLog::new(); - fault_log.extend(step.fault_log); + hb_step: honey_badger::Step, NodeUid>, + ) -> Result> { + let mut step: Step = Step::default(); let start_epoch = self.start_epoch; - for hb_batch in step.output { + let output = step.extend_with(hb_step, |hb_msg| Message::HoneyBadger(start_epoch, hb_msg)); + for hb_batch in output { // Create the batch we output ourselves. It will contain the _user_ transactions of // `hb_batch`, and the current change state. let mut batch = Batch::new(hb_batch.epoch + self.start_epoch); @@ -293,7 +263,8 @@ where key_gen_messages, contrib, } = int_contrib; - fault_log.extend(self.vote_counter.add_committed_votes(&id, votes)?); + step.fault_log + .extend(self.vote_counter.add_committed_votes(&id, votes)?); batch.contributions.insert(id, contrib); self.key_gen_msg_buffer .retain(|skgm| !key_gen_messages.contains(skgm)); @@ -305,13 +276,13 @@ where if !self.verify_signature(&s_id, &sig, &kg_msg)? { info!("Invalid signature from {:?} for: {:?}.", s_id, kg_msg); let fault_kind = FaultKind::InvalidKeyGenMessageSignature; - fault_log.append(s_id.clone(), fault_kind); + step.fault_log.append(s_id.clone(), fault_kind); continue; } - match kg_msg { + step.extend(match kg_msg { KeyGenMessage::Part(part) => self.handle_part(&s_id, part)?, - KeyGenMessage::Ack(ack) => self.handle_ack(&s_id, ack)?, - }.merge_into(&mut fault_log); + KeyGenMessage::Ack(ack) => self.handle_ack(&s_id, ack)?.into(), + }); } } @@ -323,30 +294,26 @@ where batch.set_change(ChangeState::Complete(change), &self.netinfo); } else if let Some(change) = self.vote_counter.compute_majority().cloned() { // If there is a majority, restart DKG. Inform the user about the current change. - self.update_key_gen(batch.epoch + 1, &change)?; + step.extend(self.update_key_gen(batch.epoch + 1, &change)?); batch.set_change(ChangeState::InProgress(change), &self.netinfo); } - self.output.push_back(batch); + step.output.push_back(batch); } - self.messages - .extend_with_epoch(self.start_epoch, step.messages); // If `start_epoch` changed, we can now handle some queued messages. if start_epoch < self.start_epoch { let queue = mem::replace(&mut self.incoming_queue, Vec::new()); for (sender_id, msg) in queue { - let rec_step = self.handle_message(&sender_id, msg)?; - self.output.extend(rec_step.output); - fault_log.extend(rec_step.fault_log); + step.extend(self.handle_message(&sender_id, msg)?); } } - Ok(fault_log) + Ok(step) } /// If the majority of votes has changed, restarts Key Generation for the set of nodes implied /// by the current change. - fn update_key_gen(&mut self, epoch: u64, change: &Change) -> Result<()> { + fn update_key_gen(&mut self, epoch: u64, change: &Change) -> Result> { if self.key_gen.as_ref().map(|&(_, ref ch)| ch) == Some(change) { - return Ok(()); // The change is the same as before. Continue DKG as is. + return Ok(Step::default()); // The change is the same as before. Continue DKG as is. } debug!("{:?} Restarting DKG for {:?}.", self.our_id(), change); // Use the existing key shares - with the change applied - as keys for DKG. @@ -366,9 +333,10 @@ where let (key_gen, part) = SyncKeyGen::new(our_uid, sk, pub_keys, threshold); self.key_gen = Some((key_gen, change.clone())); if let Some(part) = part { - self.send_transaction(KeyGenMessage::Part(part))?; + self.send_transaction(KeyGenMessage::Part(part)) + } else { + Ok(Step::default()) } - Ok(()) } /// Starts a new `HoneyBadger` instance and resets the vote counter. @@ -384,17 +352,14 @@ where } /// Handles a `Part` message that was output by Honey Badger. - fn handle_part(&mut self, sender_id: &NodeUid, part: Part) -> Result> { + fn handle_part(&mut self, sender_id: &NodeUid, part: Part) -> Result> { let handle = |&mut (ref mut key_gen, _): &mut (SyncKeyGen, _)| { key_gen.handle_part(&sender_id, part) }; match self.key_gen.as_mut().and_then(handle) { - Some(PartOutcome::Valid(ack)) => { - self.send_transaction(KeyGenMessage::Ack(ack))?; - Ok(FaultLog::new()) - } - Some(PartOutcome::Invalid(fault_log)) => Ok(fault_log), - None => Ok(FaultLog::new()), + Some(PartOutcome::Valid(ack)) => self.send_transaction(KeyGenMessage::Ack(ack)), + Some(PartOutcome::Invalid(fault_log)) => Ok(fault_log.into()), + None => Ok(Step::default()), } } @@ -408,18 +373,17 @@ where } /// Signs and sends a `KeyGenMessage` and also tries to commit it. - fn send_transaction(&mut self, kg_msg: KeyGenMessage) -> Result<()> { + fn send_transaction(&mut self, kg_msg: KeyGenMessage) -> Result> { let ser = bincode::serialize(&kg_msg)?; let sig = Box::new(self.netinfo.secret_key().sign(ser)); - let msg = Message::KeyGen(self.start_epoch, kg_msg.clone(), sig.clone()); - self.messages.push_back(Target::All.message(msg)); - if !self.netinfo.is_validator() { - return Ok(()); + if self.netinfo.is_validator() { + let our_uid = self.netinfo.our_uid().clone(); + let signed_msg = + SignedKeyGenMsg(self.start_epoch, our_uid, kg_msg.clone(), *sig.clone()); + self.key_gen_msg_buffer.push(signed_msg); } - let our_uid = self.netinfo.our_uid().clone(); - let signed_msg = SignedKeyGenMsg(self.start_epoch, our_uid, kg_msg, *sig); - self.key_gen_msg_buffer.push(signed_msg); - Ok(()) + let msg = Message::KeyGen(self.start_epoch, kg_msg, sig); + Ok(Target::All.message(msg).into()) } /// If the current Key Generation process is ready, returns the `SyncKeyGen`. @@ -517,27 +481,6 @@ impl Message { } } -/// The queue of outgoing messages in a `HoneyBadger` instance. -#[derive(Deref, DerefMut)] -struct MessageQueue(VecDeque, NodeUid>>); - -impl MessageQueue -where - NodeUid: Eq + Hash + Ord + Clone + Debug + Serialize + for<'r> Deserialize<'r> + Rand, -{ - /// Appends to the queue the messages from `hb`, wrapped with `epoch`. - fn extend_with_epoch( - &mut self, - epoch: u64, - mut msgs: VecDeque, NodeUid>>, - ) { - let convert = |msg: TargetedMessage, NodeUid>| { - msg.map(|hb_msg| Message::HoneyBadger(epoch, hb_msg)) - }; - self.extend(msgs.drain(..).map(convert)); - } -} - /// The information a new node requires to join the network as an observer. It contains the state /// of voting and key generation after a specific epoch, so that the new node will be in sync if it /// joins in the next one. diff --git a/src/messaging.rs b/src/messaging.rs index aeebab8..44e48e4 100644 --- a/src/messaging.rs +++ b/src/messaging.rs @@ -1,5 +1,6 @@ use std::collections::{BTreeMap, BTreeSet, VecDeque}; use std::fmt::Debug; +use std::iter::once; use crypto::{PublicKey, PublicKeySet, PublicKeyShare, SecretKey, SecretKeyShare}; use fault_log::FaultLog; @@ -80,6 +81,7 @@ impl Step where ::NodeUid: Clone, { + /// Creates a new `Step` from the given collections. pub fn new( output: VecDeque, fault_log: FaultLog, @@ -91,6 +93,76 @@ where messages, } } + + /// Converts `self` into a step of another type, given conversion methods for output and + /// messages. + pub fn map(self, f_out: FO, f_msg: FM) -> Step + where + D2: DistAlgorithm, + FO: Fn(D::Output) -> D2::Output, + FM: Fn(D::Message) -> D2::Message, + { + Step { + output: self.output.into_iter().map(f_out).collect(), + fault_log: self.fault_log, + messages: self.messages.into_iter().map(|tm| tm.map(&f_msg)).collect(), + } + } + + /// Extends `self` with `other`s messages and fault logs, and returns `other.output`. + pub fn extend_with(&mut self, other: Step, f_msg: FM) -> VecDeque + where + D2: DistAlgorithm, + FM: Fn(D2::Message) -> D::Message, + { + self.fault_log.extend(other.fault_log); + let msgs = other.messages.into_iter().map(|tm| tm.map(&f_msg)); + self.messages.extend(msgs); + other.output + } + + /// Adds the outputs, fault logs and messages of `other` to `self`. + pub fn extend(&mut self, other: Self) { + self.output.extend(other.output); + self.fault_log.extend(other.fault_log); + self.messages.extend(other.messages); + } + + /// Converts this step into an equivalent step for a different `DistAlgorithm`. + // This cannot be a `From` impl, because it would conflict with `impl From for T`. + pub fn convert(self) -> Step + where + D2: DistAlgorithm, + { + Step { + output: self.output, + fault_log: self.fault_log, + messages: self.messages, + } + } + + /// Returns `true` if there are now messages, faults or outputs. + pub fn is_empty(&self) -> bool { + self.output.is_empty() && self.fault_log.is_empty() && self.messages.is_empty() + } +} + +impl From> for Step { + fn from(fault_log: FaultLog) -> Self { + Step { + fault_log, + ..Step::default() + } + } +} + +impl From> for Step { + fn from(msg: TargetedMessage) -> Self { + Step { + messages: once(msg).collect(), + ..Step::default() + } + } } /// A distributed algorithm that defines a message flow. diff --git a/src/queueing_honey_badger.rs b/src/queueing_honey_badger.rs index bdde222..c6eee20 100644 --- a/src/queueing_honey_badger.rs +++ b/src/queueing_honey_badger.rs @@ -21,7 +21,6 @@ //! the same transaction multiple times. use std::cmp; -use std::collections::VecDeque; use std::fmt::Debug; use std::hash::Hash; use std::marker::PhantomData; @@ -30,8 +29,7 @@ use rand::Rand; use serde::{Deserialize, Serialize}; use dynamic_honey_badger::{self, Batch as DhbBatch, DynamicHoneyBadger, Message}; -use fault_log::FaultLog; -use messaging::{self, DistAlgorithm, TargetedMessage}; +use messaging::{self, DistAlgorithm}; use transaction_queue::TransactionQueue; pub use dynamic_honey_badger::{Change, ChangeState, Input}; @@ -59,6 +57,8 @@ where { /// Returns a new `QueueingHoneyBadgerBuilder` configured to use the node IDs and cryptographic /// keys specified by `netinfo`. + // TODO: Make it easier to build a `QueueingHoneyBadger` with a `JoinPlan`. Handle `Step` + // conversion internally. pub fn new(dyn_hb: DynamicHoneyBadger, NodeUid>) -> Self { // TODO: Use the defaults from `HoneyBadgerBuilder`. QueueingHoneyBadgerBuilder { @@ -75,7 +75,7 @@ where } /// Creates a new Queueing Honey Badger instance with an empty buffer. - pub fn build(self) -> QueueingHoneyBadger + pub fn build(self) -> (QueueingHoneyBadger, Step) where Tx: Serialize + for<'r> Deserialize<'r> + Debug + Hash + Eq, { @@ -85,7 +85,10 @@ where /// Returns a new Queueing Honey Badger instance that starts with the given transactions in its /// buffer. - pub fn build_with_transactions(self, txs: TI) -> Result> + pub fn build_with_transactions( + self, + txs: TI, + ) -> Result<(QueueingHoneyBadger, Step)> where TI: IntoIterator, Tx: Serialize + for<'r> Deserialize<'r> + Debug + Hash + Eq, @@ -95,10 +98,9 @@ where dyn_hb: self.dyn_hb, queue, batch_size: self.batch_size, - output: VecDeque::new(), }; - let _ = qhb.propose()?; // Fault log is empty: no contact with other nodes yet. - Ok(qhb) + let step = qhb.propose()?; + Ok((qhb, step)) } } @@ -115,8 +117,6 @@ where dyn_hb: DynamicHoneyBadger, NodeUid>, /// The queue of pending transactions that haven't been output in a batch yet. queue: TransactionQueue, - /// The outputs from completed epochs. - output: VecDeque>, } pub type Step = messaging::Step>; @@ -135,18 +135,13 @@ where fn input(&mut self, input: Self::Input) -> Result> { // User transactions are forwarded to `HoneyBadger` right away. Internal messages are // in addition signed and broadcast. - let (fault_log, messages) = match input { + match input { Input::User(tx) => { self.queue.0.push_back(tx); - (FaultLog::new(), VecDeque::new()) + Ok(Step::default()) } - Input::Change(change) => { - let step = self.dyn_hb.input(Input::Change(change))?; - // FIXME: Use the output since `dyn_hb` can output immediately on input. - (step.fault_log, step.messages) - } - }; - self.step(fault_log, messages) + Input::Change(change) => Ok(self.dyn_hb.input(Input::Change(change))?.convert()), + } } fn handle_message( @@ -154,19 +149,15 @@ where sender_id: &NodeUid, message: Self::Message, ) -> Result> { - let dynamic_honey_badger::Step { - output, - mut fault_log, - mut messages, - } = self.dyn_hb.handle_message(sender_id, message)?; - for batch in output { + let mut step = self + .dyn_hb + .handle_message(sender_id, message)? + .convert::(); + for batch in &step.output { self.queue.remove_all(batch.iter()); - self.output.push_back(batch); } - let (propose_fault_log, propose_messages) = self.propose()?; - fault_log.extend(propose_fault_log); - messages.extend(propose_messages); - self.step(fault_log, messages) + step.extend(self.propose()?); + Ok(step) } fn terminated(&self) -> bool { @@ -178,11 +169,6 @@ where } } -type ProposeResult = Result<( - FaultLog, - VecDeque, NodeUid>>, -)>; - impl QueueingHoneyBadger where Tx: Eq + Serialize + for<'r> Deserialize<'r> + Debug + Hash + Clone, @@ -196,40 +182,22 @@ where QueueingHoneyBadgerBuilder::new(dyn_hb) } - fn step( - &mut self, - fault_log: FaultLog, - messages: VecDeque, NodeUid>>, - ) -> Result> { - Ok(Step::new( - self.output.drain(..).collect(), - fault_log, - messages, - )) - } - /// Returns a reference to the internal `DynamicHoneyBadger` instance. pub fn dyn_hb(&self) -> &DynamicHoneyBadger, NodeUid> { &self.dyn_hb } /// Initiates the next epoch by proposing a batch from the queue. - fn propose(&mut self) -> ProposeResult { + fn propose(&mut self) -> Result> { let amount = cmp::max(1, self.batch_size / self.dyn_hb.netinfo().num_nodes()); - // TODO: This will loop forever if we are the only validator. - let mut fault_log = FaultLog::new(); - let mut messages = VecDeque::new(); - while !self.dyn_hb.has_input() { + // TODO: This will output immediately if we are the only validator. + if self.dyn_hb.has_input() { + Ok(Step::default()) // Error? + } else { let proposal = self.queue.choose(amount, self.batch_size); let step = self.dyn_hb.input(Input::User(proposal))?; - fault_log.extend(step.fault_log); - messages.extend(step.messages); - for batch in step.output { - self.queue.remove_all(batch.iter()); - self.output.push_back(batch); - } + Ok(Step::new(step.output, step.fault_log, step.messages)) } - Ok((fault_log, messages)) } } diff --git a/tests/dynamic_honey_badger.rs b/tests/dynamic_honey_badger.rs index 5e6d0ba..9be87a3 100644 --- a/tests/dynamic_honey_badger.rs +++ b/tests/dynamic_honey_badger.rs @@ -127,9 +127,7 @@ where // Allow passing `netinfo` by value. `TestNetwork` expects this function signature. #[cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))] fn new_dynamic_hb(netinfo: Arc>) -> UsizeDhb { - DynamicHoneyBadger::builder((*netinfo).clone()) - .build() - .expect("instantiate DHB") + DynamicHoneyBadger::builder().build((*netinfo).clone()) } fn test_dynamic_honey_badger_different_sizes(new_adversary: F, num_txs: usize) diff --git a/tests/network/mod.rs b/tests/network/mod.rs index 9a5886e..159409c 100644 --- a/tests/network/mod.rs +++ b/tests/network/mod.rs @@ -6,7 +6,7 @@ use std::sync::Arc; use rand::{self, Rng}; use hbbft::crypto::SecretKeyShare; -use hbbft::messaging::{DistAlgorithm, NetworkInfo, Target, TargetedMessage}; +use hbbft::messaging::{DistAlgorithm, NetworkInfo, Step, Target, TargetedMessage}; /// A node identifier. In the tests, nodes are simply numbered. #[derive(Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Clone, Copy, Serialize, Deserialize, Rand)] @@ -52,13 +52,13 @@ impl TestNode { } /// Creates a new test node with the given broadcast instance. - fn new(algo: D) -> TestNode { + fn new((algo, step): (D, Step)) -> TestNode { TestNode { id: algo.our_id().clone(), algo, queue: VecDeque::new(), - outputs: Vec::new(), - messages: VecDeque::new(), + outputs: step.output.into_iter().collect(), + messages: step.messages, } } @@ -369,6 +369,7 @@ where { /// Creates a new network with `good_num` good nodes, and the given `adversary` controlling /// `adv_num` nodes. + #[allow(unused)] // Not used in all tests. pub fn new( good_num: usize, adv_num: usize, @@ -378,6 +379,23 @@ where where F: Fn(Arc>) -> D, G: Fn(BTreeMap>>) -> A, + { + Self::new_with_step(good_num, adv_num, adversary, |netinfo| { + (new_algo(netinfo), Step::default()) + }) + } + + /// Creates a new network with `good_num` good nodes, and the given `adversary` controlling + /// `adv_num` nodes. + pub fn new_with_step( + good_num: usize, + adv_num: usize, + adversary: G, + new_algo: F, + ) -> TestNetwork + where + F: Fn(Arc>) -> (D, Step), + G: Fn(BTreeMap>>) -> A, { let mut rng = rand::thread_rng(); let mut netinfos = NetworkInfo::generate_map((0..(good_num + adv_num)).map(NodeUid)); diff --git a/tests/queueing_honey_badger.rs b/tests/queueing_honey_badger.rs index 2689cb7..7853eb3 100644 --- a/tests/queueing_honey_badger.rs +++ b/tests/queueing_honey_badger.rs @@ -21,7 +21,7 @@ use std::sync::Arc; use hbbft::dynamic_honey_badger::DynamicHoneyBadger; use hbbft::messaging::NetworkInfo; -use hbbft::queueing_honey_badger::{Batch, Change, ChangeState, Input, QueueingHoneyBadger}; +use hbbft::queueing_honey_badger::{Batch, Change, ChangeState, Input, QueueingHoneyBadger, Step}; use itertools::Itertools; use rand::Rng; @@ -107,10 +107,10 @@ where // Allow passing `netinfo` by value. `TestNetwork` expects this function signature. #[cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))] -fn new_queueing_hb(netinfo: Arc>) -> QueueingHoneyBadger { - let dyn_hb = DynamicHoneyBadger::builder((*netinfo).clone()) - .build() - .expect("instantiate DHB"); +fn new_queueing_hb( + netinfo: Arc>, +) -> (QueueingHoneyBadger, Step) { + let dyn_hb = DynamicHoneyBadger::builder().build((*netinfo).clone()); QueueingHoneyBadger::builder(dyn_hb).batch_size(3).build() } @@ -133,7 +133,8 @@ where num_good_nodes, num_adv_nodes ); let adversary = |adv_nodes| new_adversary(num_good_nodes, num_adv_nodes, adv_nodes); - let network = TestNetwork::new(num_good_nodes, num_adv_nodes, adversary, new_queueing_hb); + let network = + TestNetwork::new_with_step(num_good_nodes, num_adv_nodes, adversary, new_queueing_hb); test_queueing_honey_badger(network, num_txs); } }