diff --git a/examples/network/node.rs b/examples/network/node.rs index f2865f4..ce5a359 100644 --- a/examples/network/node.rs +++ b/examples/network/node.rs @@ -144,8 +144,8 @@ impl + PartialEq + Send + Sync + From> + if let Some(v) = value { // FIXME: Use the output. - let _ = broadcast.input(v.clone().into()).expect("propose value"); - for msg in broadcast.message_iter() { + let step = broadcast.input(v.clone().into()).expect("propose value"); + for msg in step.messages { tx_from_algo.send(msg).expect("send from algo"); } } @@ -158,7 +158,7 @@ impl + PartialEq + Send + Sync + From> + let step = broadcast .handle_message(&i, message) .expect("handle broadcast message"); - for msg in broadcast.message_iter() { + for msg in step.messages { debug!("{} sending to {:?}: {:?}", our_id, msg.target, msg.message); tx_from_algo.send(msg).expect("send from algo"); } diff --git a/examples/simulation.rs b/examples/simulation.rs index f51dd7b..0ba2b16 100644 --- a/examples/simulation.rs +++ b/examples/simulation.rs @@ -137,20 +137,40 @@ pub struct TestNode { hw_quality: HwQuality, } +type TestNodeStepResult = Step; + impl TestNode where D::Message: Serialize + DeserializeOwned, { /// Creates a new test node with the given broadcast instance. - fn new(algo: D, hw_quality: HwQuality) -> TestNode { + fn new((algo, step): (D, Step), hw_quality: HwQuality) -> TestNode { + let out_queue = step + .messages + .into_iter() + .map(|msg| { + let ser_msg = bincode::serialize(&msg.message).expect("serialize"); + TimestampedMessage { + time: Duration::default(), + sender_id: algo.our_id().clone(), + target: msg.target, + message: ser_msg, + } + }) + .collect(); + let outputs = step + .output + .into_iter() + .map(|out| (Duration::default(), out)) + .collect(); let mut node = TestNode { id: algo.our_id().clone(), algo, time: Duration::default(), sent_time: Duration::default(), in_queue: VecDeque::new(), - out_queue: VecDeque::new(), - outputs: Vec::new(), + out_queue, + outputs, message_count: 0, message_size: 0, hw_quality, @@ -176,19 +196,14 @@ where } /// Handles the algorithm's output and messages. - fn send_output_and_msgs( - &mut self, - step: Step<::NodeUid, ::Output>, - ) { + fn send_output_and_msgs(&mut self, step: TestNodeStepResult) { let start = Instant::now(); - let out_msgs: Vec<_> = self - .algo - .message_iter() + let out_msgs: Vec<_> = step + .messages + .into_iter() .map(|msg| { - ( - msg.target, - bincode::serialize(&msg.message).expect("serialize"), - ) + let ser_msg = bincode::serialize(&msg.message).expect("serialize"); + (msg.target, ser_msg) }) .collect(); self.time += start.elapsed() * self.hw_quality.cpu_factor / 100; @@ -251,7 +266,7 @@ where hw_quality: HwQuality, ) -> TestNetwork where - F: Fn(NetworkInfo) -> D, + F: Fn(NetworkInfo) -> (D, Step), { let netinfos = NetworkInfo::generate_map((0..(good_num + adv_num)).map(NodeUid)); let new_node = |(uid, netinfo): (NodeUid, NetworkInfo<_>)| { @@ -431,9 +446,7 @@ fn main() { let num_good_nodes = args.flag_n - args.flag_f; let txs = (0..args.flag_txs).map(|_| Transaction::new(args.flag_tx_size)); let new_honey_badger = |netinfo: NetworkInfo| { - let dyn_hb = DynamicHoneyBadger::builder(netinfo) - .build() - .expect("instantiate DynamicHoneyBadger"); + let dyn_hb = DynamicHoneyBadger::builder().build(netinfo); QueueingHoneyBadger::builder(dyn_hb) .batch_size(args.flag_b) .build_with_transactions(txs.clone()) diff --git a/src/agreement/mod.rs b/src/agreement/mod.rs index bd3f623..16a69af 100644 --- a/src/agreement/mod.rs +++ b/src/agreement/mod.rs @@ -74,20 +74,15 @@ use std::sync::Arc; use itertools::Itertools; use agreement::bin_values::BinValues; -use common_coin; -use common_coin::{CommonCoin, CommonCoinMessage, CommonCoinStep}; +use common_coin::{self, CommonCoin, CommonCoinMessage}; use fault_log::FaultLog; -use messaging::{DistAlgorithm, NetworkInfo, Step, Target, TargetedMessage}; +use messaging::{self, DistAlgorithm, NetworkInfo, Target, TargetedMessage}; error_chain!{ links { CommonCoin(common_coin::Error, common_coin::ErrorKind); } - types { - Error, ErrorKind, ResultExt, AgreementResult; - } - errors { UnknownProposer { description("unknown proposer") @@ -208,7 +203,7 @@ pub struct Agreement { coin_schedule: CoinSchedule, } -pub type AgreementStep = Step; +pub type Step = messaging::Step>; impl DistAlgorithm for Agreement { type NodeUid = NodeUid; @@ -217,7 +212,7 @@ impl DistAlgorithm for Agreement { type Message = AgreementMessage; type Error = Error; - fn input(&mut self, input: Self::Input) -> AgreementResult> { + fn input(&mut self, input: Self::Input) -> Result> { let fault_log = self.set_input(input)?; self.step(fault_log) } @@ -227,7 +222,7 @@ impl DistAlgorithm for Agreement { &mut self, sender_id: &Self::NodeUid, message: Self::Message, - ) -> AgreementResult> { + ) -> Result> { let fault_log = if self.terminated || message.epoch < self.epoch { // Message is obsolete: We are already in a later epoch or terminated. FaultLog::new() @@ -247,13 +242,6 @@ impl DistAlgorithm for Agreement { self.step(fault_log) } - /// Take the next Agreement message for multicast to all other nodes. - fn next_message(&mut self) -> Option> { - self.messages - .pop_front() - .map(|msg| Target::All.message(msg)) - } - /// Whether the algorithm has terminated. fn terminated(&self) -> bool { self.terminated @@ -269,7 +257,7 @@ impl Agreement { netinfo: Arc>, session_id: u64, proposer_id: NodeUid, - ) -> AgreementResult { + ) -> Result { let invocation_id = netinfo.invocation_id(); if let Some(proposer_i) = netinfo.node_index(&proposer_id) { Ok(Agreement { @@ -301,15 +289,19 @@ impl Agreement { } } - fn step(&mut self, fault_log: FaultLog) -> AgreementResult> { + fn step(&mut self, fault_log: FaultLog) -> Result> { Ok(Step::new( self.output.take().into_iter().collect(), fault_log, + self.messages + .drain(..) + .map(|msg| Target::All.message(msg)) + .collect(), )) } /// Sets the input value for agreement. - fn set_input(&mut self, input: bool) -> AgreementResult> { + fn set_input(&mut self, input: bool) -> Result> { if self.epoch != 0 || self.estimated.is_some() { return Err(ErrorKind::InputNotAccepted.into()); } @@ -331,7 +323,7 @@ impl Agreement { self.epoch == 0 && self.estimated.is_none() } - fn handle_bval(&mut self, sender_id: &NodeUid, b: bool) -> AgreementResult> { + fn handle_bval(&mut self, sender_id: &NodeUid, b: bool) -> Result> { self.received_bval .entry(sender_id.clone()) .or_insert_with(BTreeSet::new) @@ -369,7 +361,7 @@ impl Agreement { /// Called when `bin_values` changes as a result of receiving a `BVal` message. Tries to update /// the epoch. - fn on_bin_values_changed(&mut self) -> AgreementResult> { + fn on_bin_values_changed(&mut self) -> Result> { match self.coin_schedule { CoinSchedule::False => { let (aux_count, aux_vals) = self.count_aux(); @@ -395,7 +387,7 @@ impl Agreement { } } - fn send_bval(&mut self, b: bool) -> AgreementResult> { + fn send_bval(&mut self, b: bool) -> Result> { if !self.netinfo.is_validator() { return Ok(FaultLog::new()); } @@ -409,7 +401,7 @@ impl Agreement { self.handle_bval(our_uid, b) } - fn send_conf(&mut self) -> AgreementResult> { + fn send_conf(&mut self) -> Result> { if self.conf_round { // Only one `Conf` message is allowed in an epoch. return Ok(FaultLog::new()); @@ -436,7 +428,7 @@ impl Agreement { /// bin_values (note that bin_values_r may continue to change as `BVal` /// messages are received, thus this condition may be triggered upon arrival /// of either an `Aux` or a `BVal` message). - fn handle_aux(&mut self, sender_id: &NodeUid, b: bool) -> AgreementResult> { + fn handle_aux(&mut self, sender_id: &NodeUid, b: bool) -> Result> { // Perform the `Aux` message round only if a `Conf` round hasn't started yet. if self.conf_round { return Ok(FaultLog::new()); @@ -462,11 +454,7 @@ impl Agreement { } } - fn handle_conf( - &mut self, - sender_id: &NodeUid, - v: BinValues, - ) -> AgreementResult> { + fn handle_conf(&mut self, sender_id: &NodeUid, v: BinValues) -> Result> { self.received_conf.insert(sender_id.clone(), v); self.try_finish_conf_round() } @@ -474,7 +462,7 @@ impl Agreement { /// Receives a `Term(v)` message. If we haven't yet decided on a value and there are more than /// `num_faulty` such messages with the same value from different nodes, performs expedite /// termination: decides on `v`, broadcasts `Term(v)` and terminates the instance. - fn handle_term(&mut self, sender_id: &NodeUid, b: bool) -> AgreementResult> { + fn handle_term(&mut self, sender_id: &NodeUid, b: bool) -> Result> { self.received_term.insert(sender_id.clone(), b); // Check for the expedite termination condition. if self.decision.is_none() @@ -492,20 +480,26 @@ impl Agreement { &mut self, sender_id: &NodeUid, msg: CommonCoinMessage, - ) -> AgreementResult> { + ) -> Result> { let coin_step = self.common_coin.handle_message(sender_id, msg)?; - self.extend_common_coin(); self.on_coin_step(coin_step) } fn on_coin_step( &mut self, - coin_step: CommonCoinStep, - ) -> AgreementResult> { - let Step { + coin_step: common_coin::Step, + ) -> Result> { + let common_coin::Step { output, mut fault_log, + messages, } = coin_step; + let epoch = self.epoch; + self.messages.extend(messages.into_iter().map( + |msg: TargetedMessage| { + AgreementContent::Coin(Box::new(msg.message)).with_epoch(epoch) + }, + )); if let Some(coin) = output.into_iter().next() { let def_bin_value = self.count_conf().1.definite(); fault_log.extend(self.on_coin(coin, def_bin_value)?); @@ -515,11 +509,7 @@ impl Agreement { /// When the common coin has been computed, tries to decide on an output value, updates the /// `Agreement` epoch and handles queued messages for the new epoch. - fn on_coin( - &mut self, - coin: bool, - def_bin_value: Option, - ) -> AgreementResult> { + fn on_coin(&mut self, coin: bool, def_bin_value: Option) -> Result> { let mut fault_log = FaultLog::new(); if self.terminated { // Avoid an infinite regression without making an Agreement step. @@ -562,16 +552,6 @@ impl Agreement { } } - /// Propagates Common Coin messages to the top level. - fn extend_common_coin(&mut self) { - let epoch = self.epoch; - self.messages.extend(self.common_coin.message_iter().map( - |msg: TargetedMessage| { - AgreementContent::Coin(Box::new(msg.message)).with_epoch(epoch) - }, - )); - } - /// Decides on a value and broadcasts a `Term` message with that value. fn decide(&mut self, b: bool) { if self.terminated { @@ -596,13 +576,12 @@ impl Agreement { self.terminated = true; } - fn try_finish_conf_round(&mut self) -> AgreementResult> { + fn try_finish_conf_round(&mut self) -> Result> { if self.conf_round && self.count_conf().0 >= self.netinfo.num_nodes() - self.netinfo.num_faulty() { // Invoke the common coin. let coin_step = self.common_coin.input(())?; - self.extend_common_coin(); self.on_coin_step(coin_step) } else { // Continue waiting for (N - f) `Conf` messages @@ -610,7 +589,7 @@ impl Agreement { } } - fn send_aux(&mut self, b: bool) -> AgreementResult> { + fn send_aux(&mut self, b: bool) -> Result> { if !self.netinfo.is_validator() { return Ok(FaultLog::new()); } diff --git a/src/broadcast.rs b/src/broadcast.rs index c5554cd..09b1806 100644 --- a/src/broadcast.rs +++ b/src/broadcast.rs @@ -141,13 +141,9 @@ use ring::digest; use fault_log::{FaultKind, FaultLog}; use fmt::{HexBytes, HexList, HexProof}; -use messaging::{DistAlgorithm, NetworkInfo, Step, Target, TargetedMessage}; +use messaging::{self, DistAlgorithm, NetworkInfo, Target, TargetedMessage}; error_chain!{ - types { - Error, ErrorKind, ResultExt, BroadcastResult; - } - foreign_links { ReedSolomon(rse::Error); } @@ -228,7 +224,7 @@ pub struct Broadcast { output: Option>, } -pub type BroadcastStep = Step>; +pub type Step = messaging::Step>; impl DistAlgorithm for Broadcast { type NodeUid = NodeUid; @@ -239,7 +235,7 @@ impl DistAlgorithm for Broadcast { type Message = BroadcastMessage; type Error = Error; - fn input(&mut self, input: Self::Input) -> BroadcastResult> { + fn input(&mut self, input: Self::Input) -> Result> { if *self.netinfo.our_uid() != self.proposer_id { return Err(ErrorKind::InstanceCannotPropose.into()); } @@ -256,7 +252,7 @@ impl DistAlgorithm for Broadcast { &mut self, sender_id: &NodeUid, message: Self::Message, - ) -> BroadcastResult> { + ) -> Result> { if !self.netinfo.is_node_validator(sender_id) { return Err(ErrorKind::UnknownSender.into()); } @@ -270,10 +266,6 @@ impl DistAlgorithm for Broadcast { self.step(fault_log) } - fn next_message(&mut self) -> Option> { - self.messages.pop_front() - } - fn terminated(&self) -> bool { self.decided } @@ -286,7 +278,7 @@ impl DistAlgorithm for Broadcast { impl Broadcast { /// Creates a new broadcast instance to be used by node `our_id` which expects a value proposal /// from node `proposer_id`. - pub fn new(netinfo: Arc>, proposer_id: NodeUid) -> BroadcastResult { + pub fn new(netinfo: Arc>, proposer_id: NodeUid) -> Result { let parity_shard_num = 2 * netinfo.num_faulty(); let data_shard_num = netinfo.num_nodes() - parity_shard_num; let coding = Coding::new(data_shard_num, parity_shard_num)?; @@ -306,10 +298,11 @@ impl Broadcast { }) } - fn step(&mut self, fault_log: FaultLog) -> BroadcastResult> { + fn step(&mut self, fault_log: FaultLog) -> Result> { Ok(Step::new( self.output.take().into_iter().collect(), fault_log, + self.messages.drain(..).collect(), )) } @@ -318,7 +311,7 @@ impl Broadcast { /// scheme. The returned value contains the shard assigned to this /// node. That shard doesn't need to be sent anywhere. It gets recorded in /// the broadcast instance. - fn send_shards(&mut self, mut value: Vec) -> BroadcastResult>> { + fn send_shards(&mut self, mut value: Vec) -> Result>> { let data_shard_num = self.coding.data_shard_count(); let parity_shard_num = self.coding.parity_shard_count(); @@ -396,7 +389,7 @@ impl Broadcast { &mut self, sender_id: &NodeUid, p: Proof>, - ) -> BroadcastResult> { + ) -> Result> { // If the sender is not the proposer or if this is not the first `Value`, ignore. if *sender_id != self.proposer_id { info!( @@ -428,11 +421,7 @@ impl Broadcast { } /// Handles a received `Echo` message. - fn handle_echo( - &mut self, - sender_id: &NodeUid, - p: Proof>, - ) -> BroadcastResult> { + fn handle_echo(&mut self, sender_id: &NodeUid, p: Proof>) -> Result> { let mut fault_log = FaultLog::new(); // If the sender has already sent `Echo`, ignore. if self.echos.contains_key(sender_id) { @@ -468,7 +457,7 @@ impl Broadcast { } /// Handles a received `Ready` message. - fn handle_ready(&mut self, sender_id: &NodeUid, hash: &[u8]) -> BroadcastResult<()> { + fn handle_ready(&mut self, sender_id: &NodeUid, hash: &[u8]) -> Result<()> { // If the sender has already sent a `Ready` before, ignore. if self.readys.contains_key(sender_id) { info!( @@ -491,7 +480,7 @@ impl Broadcast { } /// Sends an `Echo` message and handles it. Does nothing if we are only an observer. - fn send_echo(&mut self, p: Proof>) -> BroadcastResult> { + fn send_echo(&mut self, p: Proof>) -> Result> { self.echo_sent = true; if !self.netinfo.is_validator() { return Ok(FaultLog::new()); @@ -503,7 +492,7 @@ impl Broadcast { } /// Sends a `Ready` message and handles it. Does nothing if we are only an observer. - fn send_ready(&mut self, hash: &[u8]) -> BroadcastResult<()> { + fn send_ready(&mut self, hash: &[u8]) -> Result<()> { self.ready_sent = true; if !self.netinfo.is_validator() { return Ok(()); @@ -516,7 +505,7 @@ impl Broadcast { /// Checks whether the condition for output are met for this hash, and if so, sets the output /// value. - fn compute_output(&mut self, hash: &[u8]) -> BroadcastResult<()> { + fn compute_output(&mut self, hash: &[u8]) -> Result<()> { if self.decided || self.count_readys(hash) <= 2 * self.netinfo.num_faulty() || self.count_echos(hash) <= self.netinfo.num_faulty() @@ -595,7 +584,7 @@ enum Coding { impl Coding { /// Creates a new `Coding` instance with the given number of shards. - fn new(data_shard_num: usize, parity_shard_num: usize) -> BroadcastResult { + fn new(data_shard_num: usize, parity_shard_num: usize) -> Result { Ok(if parity_shard_num > 0 { let rs = ReedSolomon::new(data_shard_num, parity_shard_num)?; Coding::ReedSolomon(Box::new(rs)) @@ -621,7 +610,7 @@ impl Coding { } /// Constructs (and overwrites) the parity shards. - fn encode(&self, slices: &mut [&mut [u8]]) -> BroadcastResult<()> { + fn encode(&self, slices: &mut [&mut [u8]]) -> Result<()> { match *self { Coding::ReedSolomon(ref rs) => rs.encode(slices)?, Coding::Trivial(_) => (), @@ -630,7 +619,7 @@ impl Coding { } /// If enough shards are present, reconstructs the missing ones. - fn reconstruct_shards(&self, shards: &mut [Option>]) -> BroadcastResult<()> { + fn reconstruct_shards(&self, shards: &mut [Option>]) -> Result<()> { match *self { Coding::ReedSolomon(ref rs) => rs.reconstruct_shards(shards)?, Coding::Trivial(_) => { diff --git a/src/common_coin.rs b/src/common_coin.rs index 56cd9ce..80c501b 100644 --- a/src/common_coin.rs +++ b/src/common_coin.rs @@ -28,7 +28,7 @@ use std::sync::Arc; use crypto::error as cerror; use crypto::{Signature, SignatureShare}; use fault_log::{FaultKind, FaultLog}; -use messaging::{DistAlgorithm, NetworkInfo, Step, Target, TargetedMessage}; +use messaging::{self, DistAlgorithm, NetworkInfo, Target}; error_chain! { links { @@ -78,7 +78,7 @@ pub struct CommonCoin { terminated: bool, } -pub type CommonCoinStep = Step; +pub type Step = messaging::Step>; impl DistAlgorithm for CommonCoin where @@ -92,7 +92,7 @@ where type Error = Error; /// Sends our threshold signature share if not yet sent. - fn input(&mut self, _input: Self::Input) -> Result> { + fn input(&mut self, _input: Self::Input) -> Result> { let fault_log = if !self.had_input { self.had_input = true; self.get_coin()? @@ -107,7 +107,7 @@ where &mut self, sender_id: &Self::NodeUid, message: Self::Message, - ) -> Result> { + ) -> Result> { let fault_log = if !self.terminated { let CommonCoinMessage(share) = message; self.handle_share(sender_id, share)? @@ -117,13 +117,6 @@ where self.step(fault_log) } - /// Takes the next share of a threshold signature message for multicasting to all other nodes. - fn next_message(&mut self) -> Option> { - self.messages - .pop_front() - .map(|msg| Target::All.message(msg)) - } - /// Whether the algorithm has terminated. fn terminated(&self) -> bool { self.terminated @@ -151,10 +144,14 @@ where } } - fn step(&mut self, fault_log: FaultLog) -> Result> { + fn step(&mut self, fault_log: FaultLog) -> Result> { Ok(Step::new( self.output.take().into_iter().collect(), fault_log, + self.messages + .drain(..) + .map(|msg| Target::All.message(msg)) + .collect(), )) } diff --git a/src/common_subset.rs b/src/common_subset.rs index 475521b..d9e1bc0 100644 --- a/src/common_subset.rs +++ b/src/common_subset.rs @@ -25,20 +25,17 @@ use std::collections::{BTreeMap, BTreeSet, VecDeque}; use std::fmt::Debug; +use std::result; use std::sync::Arc; -use agreement::{self, Agreement, AgreementMessage, AgreementStep}; -use broadcast::{self, Broadcast, BroadcastMessage, BroadcastStep}; +use agreement::{self, Agreement, AgreementMessage}; +use broadcast::{self, Broadcast, BroadcastMessage}; use fault_log::FaultLog; use fmt::HexBytes; -use messaging::{DistAlgorithm, NetworkInfo, Step, TargetedMessage}; +use messaging::{self, DistAlgorithm, NetworkInfo, TargetedMessage}; use rand::Rand; error_chain!{ - types { - Error, ErrorKind, ResultExt, CommonSubsetResult; - } - links { Agreement(agreement::Error, agreement::ErrorKind); Broadcast(broadcast::Error, broadcast::ErrorKind); @@ -70,19 +67,27 @@ struct MessageQueue(VecDeque, No impl MessageQueue { /// Appends to the queue the messages from `agr`, wrapped with `proposer_id`. - fn extend_agreement(&mut self, proposer_id: &NodeUid, agr: &mut Agreement) { + fn extend_agreement( + &mut self, + proposer_id: &NodeUid, + msgs: &mut VecDeque>, + ) { let convert = |msg: TargetedMessage| { msg.map(|a_msg| Message::Agreement(proposer_id.clone(), a_msg)) }; - self.extend(agr.message_iter().map(convert)); + self.extend(msgs.drain(..).map(convert)); } /// Appends to the queue the messages from `bc`, wrapped with `proposer_id`. - fn extend_broadcast(&mut self, proposer_id: &NodeUid, bc: &mut Broadcast) { + fn extend_broadcast( + &mut self, + proposer_id: &NodeUid, + msgs: &mut VecDeque>, + ) { let convert = |msg: TargetedMessage| { msg.map(|b_msg| Message::Broadcast(proposer_id.clone(), b_msg)) }; - self.extend(bc.message_iter().map(convert)); + self.extend(msgs.drain(..).map(convert)); } } @@ -102,7 +107,7 @@ pub struct CommonSubset { decided: bool, } -pub type CommonSubsetStep = Step>; +pub type Step = messaging::Step>; impl DistAlgorithm for CommonSubset { type NodeUid = NodeUid; @@ -111,7 +116,7 @@ impl DistAlgorithm for CommonSubset; type Error = Error; - fn input(&mut self, input: Self::Input) -> CommonSubsetResult> { + fn input(&mut self, input: Self::Input) -> Result> { debug!( "{:?} Proposing {:?}", self.netinfo.our_uid(), @@ -125,7 +130,7 @@ impl DistAlgorithm for CommonSubset CommonSubsetResult> { + ) -> Result> { let fault_log = match message { Message::Broadcast(p_id, b_msg) => self.handle_broadcast(sender_id, &p_id, b_msg)?, Message::Agreement(p_id, a_msg) => self.handle_agreement(sender_id, &p_id, a_msg)?, @@ -133,10 +138,6 @@ impl DistAlgorithm for CommonSubset Option> { - self.messages.pop_front() - } - fn terminated(&self) -> bool { self.messages.is_empty() && self.agreement_instances.values().all(Agreement::terminated) } @@ -147,7 +148,7 @@ impl DistAlgorithm for CommonSubset CommonSubset { - pub fn new(netinfo: Arc>, session_id: u64) -> CommonSubsetResult { + pub fn new(netinfo: Arc>, session_id: u64) -> Result { // Create all broadcast instances. let mut broadcast_instances: BTreeMap> = BTreeMap::new(); for proposer_id in netinfo.all_uids() { @@ -178,22 +179,17 @@ impl CommonSubset { }) } - fn step( - &mut self, - fault_log: FaultLog, - ) -> CommonSubsetResult> { + fn step(&mut self, fault_log: FaultLog) -> Result> { Ok(Step::new( self.output.take().into_iter().collect(), fault_log, + self.messages.drain(..).collect(), )) } /// Common Subset input message handler. It receives a value for broadcast /// and redirects it to the corresponding broadcast instance. - pub fn send_proposed_value( - &mut self, - value: ProposedValue, - ) -> CommonSubsetResult> { + pub fn send_proposed_value(&mut self, value: ProposedValue) -> Result> { if !self.netinfo.is_validator() { return Ok(FaultLog::new()); } @@ -209,7 +205,7 @@ impl CommonSubset { sender_id: &NodeUid, proposer_id: &NodeUid, bmessage: BroadcastMessage, - ) -> CommonSubsetResult> { + ) -> Result> { self.process_broadcast(proposer_id, |bc| bc.handle_message(sender_id, bmessage)) } @@ -220,7 +216,7 @@ impl CommonSubset { sender_id: &NodeUid, proposer_id: &NodeUid, amessage: AgreementMessage, - ) -> CommonSubsetResult> { + ) -> Result> { // Send the message to the local instance of Agreement self.process_agreement(proposer_id, |agreement| { agreement.handle_message(sender_id, amessage) @@ -229,13 +225,10 @@ impl CommonSubset { /// Upon delivery of v_j from RBC_j, if input has not yet been provided to /// BA_j, then provide input 1 to BA_j. See Figure 11. - fn process_broadcast( - &mut self, - proposer_id: &NodeUid, - f: F, - ) -> CommonSubsetResult> + fn process_broadcast(&mut self, proposer_id: &NodeUid, f: F) -> Result> where - F: FnOnce(&mut Broadcast) -> Result, broadcast::Error>, + F: FnOnce(&mut Broadcast) + -> result::Result, broadcast::Error>, { let mut fault_log = FaultLog::new(); let value = { @@ -243,9 +236,10 @@ impl CommonSubset { .broadcast_instances .get_mut(proposer_id) .ok_or(ErrorKind::NoSuchBroadcastInstance)?; - let step = f(broadcast)?; + let mut step = f(broadcast)?; fault_log.extend(step.fault_log); - self.messages.extend_broadcast(&proposer_id, broadcast); + self.messages + .extend_broadcast(&proposer_id, &mut step.messages); if let Some(output) = step.output.into_iter().next() { output } else { @@ -257,7 +251,7 @@ impl CommonSubset { if agreement.accepts_input() { agreement.input(true) } else { - Ok(Step::default()) + Ok(agreement::Step::default()) } }; self.process_agreement(proposer_id, set_agreement_input)? @@ -267,13 +261,10 @@ impl CommonSubset { /// Callback to be invoked on receipt of the decision value of the Agreement /// instance `uid`. - fn process_agreement( - &mut self, - proposer_id: &NodeUid, - f: F, - ) -> CommonSubsetResult> + fn process_agreement(&mut self, proposer_id: &NodeUid, f: F) -> Result> where - F: FnOnce(&mut Agreement) -> Result, agreement::Error>, + F: FnOnce(&mut Agreement) + -> result::Result, agreement::Error>, { let mut fault_log = FaultLog::new(); let value = { @@ -284,9 +275,10 @@ impl CommonSubset { if agreement.terminated() { return Ok(fault_log); } - let step = f(agreement)?; + let mut step = f(agreement)?; fault_log.extend(step.fault_log); - self.messages.extend_agreement(proposer_id, agreement); + self.messages + .extend_agreement(proposer_id, &mut step.messages); if let Some(output) = step.output.into_iter().next() { output } else { @@ -311,9 +303,9 @@ impl CommonSubset { // input 0 to each instance of BA that has not yet been provided input. for (uid, agreement) in &mut self.agreement_instances { if agreement.accepts_input() { - let step = agreement.input(false)?; + let mut step = agreement.input(false)?; fault_log.extend(step.fault_log); - self.messages.extend_agreement(uid, agreement); + self.messages.extend_agreement(uid, &mut step.messages); if let Some(output) = step.output.into_iter().next() { if self.agreement_results.insert(uid.clone(), output).is_some() { return Err(ErrorKind::MultipleAgreementResults.into()); diff --git a/src/dynamic_honey_badger/builder.rs b/src/dynamic_honey_badger/builder.rs index e559ea3..ee67764 100644 --- a/src/dynamic_honey_badger/builder.rs +++ b/src/dynamic_honey_badger/builder.rs @@ -1,4 +1,4 @@ -use std::collections::VecDeque; +use std::default::Default; use std::fmt::Debug; use std::hash::Hash; use std::iter::once; @@ -8,7 +8,7 @@ use std::sync::Arc; use rand::{self, Rand, Rng}; use serde::{Deserialize, Serialize}; -use super::{ChangeState, DynamicHoneyBadger, JoinPlan, MessageQueue, Result, VoteCounter}; +use super::{ChangeState, DynamicHoneyBadger, JoinPlan, Result, Step, VoteCounter}; use crypto::{SecretKey, SecretKeySet, SecretKeyShare}; use honey_badger::HoneyBadger; use messaging::NetworkInfo; @@ -16,15 +16,19 @@ use messaging::NetworkInfo; /// A Dynamic Honey Badger builder, to configure the parameters and create new instances of /// `DynamicHoneyBadger`. pub struct DynamicHoneyBadgerBuilder { - /// Shared network data. - netinfo: NetworkInfo, - /// The epoch at which to join the network. - start_epoch: u64, - /// The current change, for which key generation is beginning at `start_epoch`. - change: ChangeState, /// The maximum number of future epochs for which we handle messages simultaneously. max_future_epochs: usize, - _phantom: PhantomData, + _phantom: PhantomData<(C, NodeUid)>, +} + +impl Default for DynamicHoneyBadgerBuilder { + fn default() -> Self { + // TODO: Use the defaults from `HoneyBadgerBuilder`. + DynamicHoneyBadgerBuilder { + max_future_epochs: 3, + _phantom: PhantomData, + } + } } impl DynamicHoneyBadgerBuilder @@ -34,51 +38,8 @@ where { /// Returns a new `DynamicHoneyBadgerBuilder` configured to use the node IDs and cryptographic /// keys specified by `netinfo`. - pub fn new(netinfo: NetworkInfo) -> Self { - // TODO: Use the defaults from `HoneyBadgerBuilder`. - DynamicHoneyBadgerBuilder { - netinfo, - start_epoch: 0, - change: ChangeState::None, - max_future_epochs: 3, - _phantom: PhantomData, - } - } - - /// Returns a new `DynamicHoneyBadgerBuilder` configured to start a new network as a single - /// validator. - pub fn new_first_node(our_uid: NodeUid) -> Self { - let mut rng = rand::thread_rng(); - let sk_set = SecretKeySet::random(0, &mut rng); - let pk_set = sk_set.public_keys(); - let sks = sk_set.secret_key_share(0); - let sk: SecretKey = rng.gen(); - let pub_keys = once((our_uid.clone(), sk.public_key())).collect(); - let netinfo = NetworkInfo::new(our_uid, sks, pk_set, sk, pub_keys); - DynamicHoneyBadgerBuilder::new(netinfo) - } - - /// Returns a new `DynamicHoneyBadgerBuilder` configured to join the network at the epoch - /// specified in the `JoinPlan`. - pub fn new_joining( - our_uid: NodeUid, - secret_key: SecretKey, - join_plan: JoinPlan, - ) -> Self { - let netinfo = NetworkInfo::new( - our_uid, - SecretKeyShare::default(), // TODO: Should be an option? - join_plan.pub_key_set, - secret_key, - join_plan.pub_keys, - ); - DynamicHoneyBadgerBuilder { - netinfo, - start_epoch: join_plan.epoch, - change: join_plan.change, - max_future_epochs: 3, - _phantom: PhantomData, - } + pub fn new() -> Self { + Self::default() } /// Sets the maximum number of future epochs for which we handle messages simultaneously. @@ -88,26 +49,68 @@ where } /// Creates a new Dynamic Honey Badger instance with an empty buffer. - pub fn build(&self) -> Result> { - let netinfo = Arc::new(self.netinfo.clone()); - let honey_badger = HoneyBadger::builder(netinfo.clone()) + pub fn build(&self, netinfo: NetworkInfo) -> DynamicHoneyBadger { + let arc_netinfo = Arc::new(netinfo.clone()); + let honey_badger = HoneyBadger::builder(arc_netinfo.clone()) .max_future_epochs(self.max_future_epochs) .build(); - let mut dhb = DynamicHoneyBadger { - netinfo: self.netinfo.clone(), + DynamicHoneyBadger { + netinfo, max_future_epochs: self.max_future_epochs, - start_epoch: self.start_epoch, - vote_counter: VoteCounter::new(netinfo, self.start_epoch), + start_epoch: 0, + vote_counter: VoteCounter::new(arc_netinfo, 0), key_gen_msg_buffer: Vec::new(), honey_badger, key_gen: None, incoming_queue: Vec::new(), - messages: MessageQueue(VecDeque::new()), - output: VecDeque::new(), - }; - if let ChangeState::InProgress(ref change) = self.change { - dhb.update_key_gen(self.start_epoch, change)?; } - Ok(dhb) + } + + /// Creates a new `DynamicHoneyBadger` configured to start a new network as a single validator. + pub fn build_first_node(&self, our_uid: NodeUid) -> DynamicHoneyBadger { + let mut rng = rand::thread_rng(); + let sk_set = SecretKeySet::random(0, &mut rng); + let pk_set = sk_set.public_keys(); + let sks = sk_set.secret_key_share(0); + let sk: SecretKey = rng.gen(); + let pub_keys = once((our_uid.clone(), sk.public_key())).collect(); + let netinfo = NetworkInfo::new(our_uid, sks, pk_set, sk, pub_keys); + self.build(netinfo) + } + + /// Creates a new `DynamicHoneyBadger` configured to join the network at the epoch specified in + /// the `JoinPlan`. + pub fn build_joining( + &self, + our_uid: NodeUid, + secret_key: SecretKey, + join_plan: JoinPlan, + ) -> Result<(DynamicHoneyBadger, Step)> { + let netinfo = NetworkInfo::new( + our_uid, + SecretKeyShare::default(), // TODO: Should be an option? + join_plan.pub_key_set, + secret_key, + join_plan.pub_keys, + ); + let arc_netinfo = Arc::new(netinfo.clone()); + let honey_badger = HoneyBadger::builder(arc_netinfo.clone()) + .max_future_epochs(self.max_future_epochs) + .build(); + let mut dhb = DynamicHoneyBadger { + netinfo, + max_future_epochs: self.max_future_epochs, + start_epoch: join_plan.epoch, + vote_counter: VoteCounter::new(arc_netinfo, join_plan.epoch), + key_gen_msg_buffer: Vec::new(), + honey_badger, + key_gen: None, + incoming_queue: Vec::new(), + }; + let step = match join_plan.change { + ChangeState::InProgress(ref change) => dhb.update_key_gen(join_plan.epoch, change)?, + ChangeState::None | ChangeState::Complete(..) => Step::default(), + }; + Ok((dhb, step)) } } diff --git a/src/dynamic_honey_badger/mod.rs b/src/dynamic_honey_badger/mod.rs index 5e8b468..968e95c 100644 --- a/src/dynamic_honey_badger/mod.rs +++ b/src/dynamic_honey_badger/mod.rs @@ -56,7 +56,7 @@ //! majority before that happens, key generation resets again, and is attempted for the new change. use rand::Rand; -use std::collections::{BTreeMap, VecDeque}; +use std::collections::BTreeMap; use std::fmt::Debug; use std::hash::Hash; use std::mem; @@ -66,10 +66,10 @@ use bincode; use serde::{Deserialize, Serialize}; use self::votes::{SignedVote, VoteCounter}; -use crypto::{PublicKey, PublicKeySet, SecretKey, Signature}; +use crypto::{PublicKey, PublicKeySet, Signature}; use fault_log::{FaultKind, FaultLog}; -use honey_badger::{HoneyBadger, HoneyBadgerStep, Message as HbMessage}; -use messaging::{DistAlgorithm, NetworkInfo, Step, Target, TargetedMessage}; +use honey_badger::{self, HoneyBadger, Message as HbMessage}; +use messaging::{self, DistAlgorithm, NetworkInfo, Target}; use sync_key_gen::{Ack, Part, PartOutcome, SyncKeyGen}; pub use self::batch::Batch; @@ -110,13 +110,9 @@ pub struct DynamicHoneyBadger { key_gen: Option<(SyncKeyGen, Change)>, /// A queue for messages from future epochs that cannot be handled yet. incoming_queue: Vec<(NodeUid, Message)>, - /// The messages that need to be sent to other nodes. - messages: MessageQueue, - /// The outputs from completed epochs. - output: VecDeque>, } -pub type DynamicHoneyBadgerStep = Step>; +pub type Step = messaging::Step>; impl DistAlgorithm for DynamicHoneyBadger where @@ -129,48 +125,44 @@ where type Message = Message; type Error = Error; - fn input(&mut self, input: Self::Input) -> Result> { + fn input(&mut self, input: Self::Input) -> Result> { // User contributions are forwarded to `HoneyBadger` right away. Votes are signed and // broadcast. - let fault_log = match input { - Input::User(contrib) => self.propose(contrib)?, - Input::Change(change) => self.vote_for(change).map(|()| FaultLog::new())?, - }; - self.step(fault_log) + match input { + Input::User(contrib) => self.propose(contrib), + Input::Change(change) => self.vote_for(change), + } } fn handle_message( &mut self, sender_id: &NodeUid, message: Self::Message, - ) -> Result> { + ) -> Result> { let epoch = message.start_epoch(); - let fault_log = if epoch < self.start_epoch { + if epoch < self.start_epoch { // Obsolete message. - FaultLog::new() + Ok(Step::default()) } else if epoch > self.start_epoch { // Message cannot be handled yet. Save it for later. let entry = (sender_id.clone(), message); self.incoming_queue.push(entry); - FaultLog::new() + Ok(Step::default()) } else { match message { Message::HoneyBadger(_, hb_msg) => { - self.handle_honey_badger_message(sender_id, hb_msg)? + self.handle_honey_badger_message(sender_id, hb_msg) } Message::KeyGen(_, kg_msg, sig) => { - self.handle_key_gen_message(sender_id, kg_msg, *sig)? - } - Message::SignedVote(signed_vote) => { - self.vote_counter.add_pending_vote(sender_id, signed_vote)? + self.handle_key_gen_message(sender_id, kg_msg, *sig)?; + Ok(Step::default()) } + Message::SignedVote(signed_vote) => self + .vote_counter + .add_pending_vote(sender_id, signed_vote) + .map(FaultLog::into), } - }; - self.step(fault_log) - } - - fn next_message(&mut self) -> Option> { - self.messages.pop_front() + } } fn terminated(&self) -> bool { @@ -187,30 +179,9 @@ where C: Eq + Serialize + for<'r> Deserialize<'r> + Debug + Hash, NodeUid: Eq + Ord + Clone + Debug + Serialize + for<'r> Deserialize<'r> + Hash + Rand, { - fn step(&mut self, fault_log: FaultLog) -> Result> { - Ok(Step::new(self.output.drain(..).collect(), fault_log)) - } - - /// Returns a new `DynamicHoneyBadgerBuilder` configured to use the node IDs and cryptographic - /// keys specified by `netinfo`. - pub fn builder(netinfo: NetworkInfo) -> DynamicHoneyBadgerBuilder { - DynamicHoneyBadgerBuilder::new(netinfo) - } - - /// Returns a new `DynamicHoneyBadgerBuilder` configured to start a new network as the first - /// node. - pub fn first_node_builder(our_uid: NodeUid) -> DynamicHoneyBadgerBuilder { - DynamicHoneyBadgerBuilder::new_first_node(our_uid) - } - - /// Returns a new `DynamicHoneyBadgerBuilder` configured to join the network at the epoch - /// specified in the `JoinPlan`. - pub fn joining_builder( - our_uid: NodeUid, - secret_key: SecretKey, - join_plan: JoinPlan, - ) -> DynamicHoneyBadgerBuilder { - DynamicHoneyBadgerBuilder::new_joining(our_uid, secret_key, join_plan) + /// Returns a new `DynamicHoneyBadgerBuilder`. + pub fn builder() -> DynamicHoneyBadgerBuilder { + DynamicHoneyBadgerBuilder::new() } /// Returns `true` if input for the current epoch has already been provided. @@ -219,7 +190,7 @@ where } /// Proposes a contribution in the current epoch. - pub fn propose(&mut self, contrib: C) -> Result> { + pub fn propose(&mut self, contrib: C) -> Result> { let step = self.honey_badger.input(InternalContrib { contrib, key_gen_messages: self.key_gen_msg_buffer.clone(), @@ -229,14 +200,13 @@ where } /// Cast a vote to change the set of validators. - pub fn vote_for(&mut self, change: Change) -> Result<()> { + pub fn vote_for(&mut self, change: Change) -> Result> { if !self.netinfo.is_validator() { - return Ok(()); // TODO: Return an error? + return Ok(Step::default()); // TODO: Return an error? } let signed_vote = self.vote_counter.sign_vote_for(change)?.clone(); let msg = Message::SignedVote(signed_vote); - self.messages.push_back(Target::All.message(msg)); - Ok(()) + Ok(Target::All.message(msg).into()) } /// Returns the information about the node IDs in the network, and the cryptographic keys. @@ -249,12 +219,12 @@ where &mut self, sender_id: &NodeUid, message: HbMessage, - ) -> Result> { + ) -> Result> { if !self.netinfo.is_node_validator(sender_id) { info!("Unknown sender {:?} of message {:?}", sender_id, message); return Err(ErrorKind::UnknownSender.into()); } - // Handle the message and put the outgoing messages into the queue. + // Handle the message. let step = self.honey_badger.handle_message(sender_id, message)?; self.process_output(step) } @@ -266,22 +236,22 @@ where sender_id: &NodeUid, kg_msg: KeyGenMessage, sig: Signature, - ) -> Result> { + ) -> Result<()> { self.verify_signature(sender_id, &sig, &kg_msg)?; let tx = SignedKeyGenMsg(self.start_epoch, sender_id.clone(), kg_msg, sig); self.key_gen_msg_buffer.push(tx); - Ok(FaultLog::default()) + Ok(()) } /// Processes all pending batches output by Honey Badger. fn process_output( &mut self, - step: HoneyBadgerStep, NodeUid>, - ) -> Result> { - let mut fault_log = FaultLog::new(); - fault_log.extend(step.fault_log); + hb_step: honey_badger::Step, NodeUid>, + ) -> Result> { + let mut step: Step = Step::default(); let start_epoch = self.start_epoch; - for hb_batch in step.output { + let output = step.extend_with(hb_step, |hb_msg| Message::HoneyBadger(start_epoch, hb_msg)); + for hb_batch in output { // Create the batch we output ourselves. It will contain the _user_ transactions of // `hb_batch`, and the current change state. let mut batch = Batch::new(hb_batch.epoch + self.start_epoch); @@ -293,7 +263,8 @@ where key_gen_messages, contrib, } = int_contrib; - fault_log.extend(self.vote_counter.add_committed_votes(&id, votes)?); + step.fault_log + .extend(self.vote_counter.add_committed_votes(&id, votes)?); batch.contributions.insert(id, contrib); self.key_gen_msg_buffer .retain(|skgm| !key_gen_messages.contains(skgm)); @@ -305,13 +276,13 @@ where if !self.verify_signature(&s_id, &sig, &kg_msg)? { info!("Invalid signature from {:?} for: {:?}.", s_id, kg_msg); let fault_kind = FaultKind::InvalidKeyGenMessageSignature; - fault_log.append(s_id.clone(), fault_kind); + step.fault_log.append(s_id.clone(), fault_kind); continue; } - match kg_msg { + step.extend(match kg_msg { KeyGenMessage::Part(part) => self.handle_part(&s_id, part)?, - KeyGenMessage::Ack(ack) => self.handle_ack(&s_id, ack)?, - }.merge_into(&mut fault_log); + KeyGenMessage::Ack(ack) => self.handle_ack(&s_id, ack)?.into(), + }); } } @@ -323,30 +294,26 @@ where batch.set_change(ChangeState::Complete(change), &self.netinfo); } else if let Some(change) = self.vote_counter.compute_majority().cloned() { // If there is a majority, restart DKG. Inform the user about the current change. - self.update_key_gen(batch.epoch + 1, &change)?; + step.extend(self.update_key_gen(batch.epoch + 1, &change)?); batch.set_change(ChangeState::InProgress(change), &self.netinfo); } - self.output.push_back(batch); + step.output.push_back(batch); } - self.messages - .extend_with_epoch(self.start_epoch, &mut self.honey_badger); // If `start_epoch` changed, we can now handle some queued messages. if start_epoch < self.start_epoch { let queue = mem::replace(&mut self.incoming_queue, Vec::new()); for (sender_id, msg) in queue { - let rec_step = self.handle_message(&sender_id, msg)?; - self.output.extend(rec_step.output); - fault_log.extend(rec_step.fault_log); + step.extend(self.handle_message(&sender_id, msg)?); } } - Ok(fault_log) + Ok(step) } /// If the majority of votes has changed, restarts Key Generation for the set of nodes implied /// by the current change. - fn update_key_gen(&mut self, epoch: u64, change: &Change) -> Result<()> { + fn update_key_gen(&mut self, epoch: u64, change: &Change) -> Result> { if self.key_gen.as_ref().map(|&(_, ref ch)| ch) == Some(change) { - return Ok(()); // The change is the same as before. Continue DKG as is. + return Ok(Step::default()); // The change is the same as before. Continue DKG as is. } debug!("{:?} Restarting DKG for {:?}.", self.our_id(), change); // Use the existing key shares - with the change applied - as keys for DKG. @@ -366,16 +333,14 @@ where let (key_gen, part) = SyncKeyGen::new(our_uid, sk, pub_keys, threshold); self.key_gen = Some((key_gen, change.clone())); if let Some(part) = part { - self.send_transaction(KeyGenMessage::Part(part))?; + self.send_transaction(KeyGenMessage::Part(part)) + } else { + Ok(Step::default()) } - Ok(()) } /// Starts a new `HoneyBadger` instance and resets the vote counter. fn restart_honey_badger(&mut self, epoch: u64) { - // TODO: Filter out the messages for `epoch` and later. - self.messages - .extend_with_epoch(self.start_epoch, &mut self.honey_badger); self.start_epoch = epoch; self.key_gen_msg_buffer.retain(|kg_msg| kg_msg.0 >= epoch); let netinfo = Arc::new(self.netinfo.clone()); @@ -387,17 +352,14 @@ where } /// Handles a `Part` message that was output by Honey Badger. - fn handle_part(&mut self, sender_id: &NodeUid, part: Part) -> Result> { + fn handle_part(&mut self, sender_id: &NodeUid, part: Part) -> Result> { let handle = |&mut (ref mut key_gen, _): &mut (SyncKeyGen, _)| { key_gen.handle_part(&sender_id, part) }; match self.key_gen.as_mut().and_then(handle) { - Some(PartOutcome::Valid(ack)) => { - self.send_transaction(KeyGenMessage::Ack(ack))?; - Ok(FaultLog::new()) - } - Some(PartOutcome::Invalid(fault_log)) => Ok(fault_log), - None => Ok(FaultLog::new()), + Some(PartOutcome::Valid(ack)) => self.send_transaction(KeyGenMessage::Ack(ack)), + Some(PartOutcome::Invalid(fault_log)) => Ok(fault_log.into()), + None => Ok(Step::default()), } } @@ -411,18 +373,17 @@ where } /// Signs and sends a `KeyGenMessage` and also tries to commit it. - fn send_transaction(&mut self, kg_msg: KeyGenMessage) -> Result<()> { + fn send_transaction(&mut self, kg_msg: KeyGenMessage) -> Result> { let ser = bincode::serialize(&kg_msg)?; let sig = Box::new(self.netinfo.secret_key().sign(ser)); - let msg = Message::KeyGen(self.start_epoch, kg_msg.clone(), sig.clone()); - self.messages.push_back(Target::All.message(msg)); - if !self.netinfo.is_validator() { - return Ok(()); + if self.netinfo.is_validator() { + let our_uid = self.netinfo.our_uid().clone(); + let signed_msg = + SignedKeyGenMsg(self.start_epoch, our_uid, kg_msg.clone(), *sig.clone()); + self.key_gen_msg_buffer.push(signed_msg); } - let our_uid = self.netinfo.our_uid().clone(); - let signed_msg = SignedKeyGenMsg(self.start_epoch, our_uid, kg_msg, *sig); - self.key_gen_msg_buffer.push(signed_msg); - Ok(()) + let msg = Message::KeyGen(self.start_epoch, kg_msg, sig); + Ok(Target::All.message(msg).into()) } /// If the current Key Generation process is ready, returns the `SyncKeyGen`. @@ -520,26 +481,6 @@ impl Message { } } -/// The queue of outgoing messages in a `HoneyBadger` instance. -#[derive(Deref, DerefMut)] -struct MessageQueue(VecDeque, NodeUid>>); - -impl MessageQueue -where - NodeUid: Eq + Hash + Ord + Clone + Debug + Serialize + for<'r> Deserialize<'r> + Rand, -{ - /// Appends to the queue the messages from `hb`, wrapped with `epoch`. - fn extend_with_epoch(&mut self, epoch: u64, hb: &mut HoneyBadger) - where - Tx: Eq + Serialize + for<'r> Deserialize<'r> + Debug + Hash, - { - let convert = |msg: TargetedMessage, NodeUid>| { - msg.map(|hb_msg| Message::HoneyBadger(epoch, hb_msg)) - }; - self.extend(hb.message_iter().map(convert)); - } -} - /// The information a new node requires to join the network as an observer. It contains the state /// of voting and key generation after a specific epoch, so that the new node will be in sync if it /// joins in the next one. diff --git a/src/honey_badger.rs b/src/honey_badger.rs index 4394e38..4d4c572 100644 --- a/src/honey_badger.rs +++ b/src/honey_badger.rs @@ -34,16 +34,12 @@ use bincode; use itertools::Itertools; use serde::{Deserialize, Serialize}; -use common_subset::{self, CommonSubset, CommonSubsetStep}; +use common_subset::{self, CommonSubset}; use crypto::{Ciphertext, DecryptionShare}; use fault_log::{FaultKind, FaultLog}; -use messaging::{DistAlgorithm, NetworkInfo, Step, Target, TargetedMessage}; +use messaging::{self, DistAlgorithm, NetworkInfo, Target, TargetedMessage}; error_chain!{ - types { - Error, ErrorKind, ResultExt, HoneyBadgerResult; - } - links { CommonSubset(common_subset::Error, common_subset::ErrorKind); } @@ -134,7 +130,7 @@ pub struct HoneyBadger { ciphertexts: BTreeMap>, } -pub type HoneyBadgerStep = Step>; +pub type Step = messaging::Step>; impl DistAlgorithm for HoneyBadger where @@ -147,7 +143,7 @@ where type Message = Message; type Error = Error; - fn input(&mut self, input: Self::Input) -> HoneyBadgerResult> { + fn input(&mut self, input: Self::Input) -> Result> { let fault_log = self.propose(&input)?; self.step(fault_log) } @@ -156,7 +152,7 @@ where &mut self, sender_id: &NodeUid, message: Self::Message, - ) -> HoneyBadgerResult> { + ) -> Result> { if !self.netinfo.is_node_validator(sender_id) { return Err(ErrorKind::UnknownSender.into()); } @@ -174,10 +170,6 @@ where self.step(fault_log) } - fn next_message(&mut self) -> Option> { - self.messages.pop_front() - } - fn terminated(&self) -> bool { false } @@ -198,15 +190,16 @@ where HoneyBadgerBuilder::new(netinfo) } - fn step( - &mut self, - fault_log: FaultLog, - ) -> HoneyBadgerResult> { - Ok(Step::new(self.output.drain(..).collect(), fault_log)) + fn step(&mut self, fault_log: FaultLog) -> Result> { + Ok(Step::new( + self.output.drain(..).collect(), + fault_log, + self.messages.drain(..).collect(), + )) } /// Proposes a new item in the current epoch. - pub fn propose(&mut self, proposal: &C) -> HoneyBadgerResult> { + pub fn propose(&mut self, proposal: &C) -> Result> { if !self.netinfo.is_validator() { return Ok(FaultLog::new()); } @@ -220,11 +213,9 @@ where let ser_prop = bincode::serialize(&proposal)?; let ciphertext = self.netinfo.public_key_set().public_key().encrypt(ser_prop); self.has_input = true; - let step = cs.input(bincode::serialize(&ciphertext).unwrap())?; - self.messages.extend_with_epoch(self.epoch, cs); - step + cs.input(bincode::serialize(&ciphertext).unwrap())? }; - Ok(self.process_output(step)?) + Ok(self.process_output(step, None)?) } /// Returns `true` if input for the current epoch has already been provided. @@ -238,7 +229,7 @@ where sender_id: &NodeUid, epoch: u64, content: MessageContent, - ) -> HoneyBadgerResult> { + ) -> Result> { match content { MessageContent::CommonSubset(cs_msg) => { self.handle_common_subset_message(sender_id, epoch, cs_msg) @@ -255,7 +246,7 @@ where sender_id: &NodeUid, epoch: u64, message: common_subset::Message, - ) -> HoneyBadgerResult> { + ) -> Result> { let mut fault_log = FaultLog::new(); let step = { // Borrow the instance for `epoch`, or create it. @@ -270,15 +261,9 @@ where } } }; - // Handle the message and put the outgoing messages into the queue. - let cs_step = cs.handle_message(sender_id, message)?; - self.messages.extend_with_epoch(epoch, cs); - cs_step + cs.handle_message(sender_id, message)? }; - // If this is the current epoch, the message could cause a new output. - if epoch == self.epoch { - fault_log.extend(self.process_output(step)?); - } + fault_log.extend(self.process_output(step, Some(epoch))?); self.remove_terminated(epoch); Ok(fault_log) } @@ -290,7 +275,7 @@ where epoch: u64, proposer_id: NodeUid, share: DecryptionShare, - ) -> HoneyBadgerResult> { + ) -> Result> { let mut fault_log = FaultLog::new(); if let Some(ciphertext) = self @@ -339,7 +324,7 @@ where /// When contributions of transactions have been decrypted for all valid proposers in this /// epoch, moves those contributions into a batch, outputs the batch and updates the epoch. - fn try_output_batch(&mut self) -> HoneyBadgerResult> { + fn try_output_batch(&mut self) -> Result> { // Wait until contributions have been successfully decoded for all proposer nodes with correct // ciphertext outputs. if !self.all_contributions_decrypted() { @@ -379,7 +364,7 @@ where } /// Increments the epoch number and clears any state that is local to the finished epoch. - fn update_epoch(&mut self) -> HoneyBadgerResult> { + fn update_epoch(&mut self) -> Result> { // Clear the state of the old epoch. self.ciphertexts.remove(&self.epoch); self.decrypted_contributions.clear(); @@ -402,7 +387,7 @@ where } /// Tries to decrypt contributions from all proposers and output those in a batch. - fn try_decrypt_and_output_batch(&mut self) -> HoneyBadgerResult> { + fn try_decrypt_and_output_batch(&mut self) -> Result> { // Return if we don't have ciphertexts yet. let proposer_ids: Vec<_> = match self.ciphertexts.get(&self.epoch) { Some(cts) => cts.keys().cloned().collect(), @@ -474,7 +459,7 @@ where fn send_decryption_shares( &mut self, cs_output: BTreeMap>, - ) -> HoneyBadgerResult> { + ) -> Result> { let mut fault_log = FaultLog::new(); let mut ciphertexts = BTreeMap::new(); for (proposer_id, v) in cs_output { @@ -512,7 +497,7 @@ where &mut self, proposer_id: &NodeUid, ciphertext: &Ciphertext, - ) -> HoneyBadgerResult<(bool, FaultLog)> { + ) -> Result<(bool, FaultLog)> { if !self.netinfo.is_validator() { return Ok((ciphertext.verify(), FaultLog::new())); } @@ -576,18 +561,27 @@ where } } - /// Checks whether the current epoch has output, and if it does, sends out our decryption shares. + /// Checks whether the current epoch has output, and if it does, sends out our decryption + /// shares. The `epoch` argument allows to differentiate between calls which produce output in + /// all conditions, `epoch == None`, and calls which only produce output in a given epoch, + /// `epoch == Some(given_epoch)`. fn process_output( &mut self, - step: CommonSubsetStep, - ) -> HoneyBadgerResult> { - let Step { + step: common_subset::Step, + epoch: Option, + ) -> Result> { + let common_subset::Step { output, mut fault_log, + mut messages, } = step; - for cs_output in output { - fault_log.extend(self.send_decryption_shares(cs_output)?); - // TODO: May also check that there is no further output from Common Subset. + self.messages.extend_with_epoch(self.epoch, &mut messages); + // If this is the current epoch, the message could cause a new output. + if epoch.is_none() || epoch == Some(self.epoch) { + for cs_output in output { + fault_log.extend(self.send_decryption_shares(cs_output)?); + // TODO: May also check that there is no further output from Common Subset. + } } Ok(fault_log) } @@ -699,10 +693,14 @@ struct MessageQueue(VecDeque, No impl MessageQueue { /// Appends to the queue the messages from `cs`, wrapped with `epoch`. - fn extend_with_epoch(&mut self, epoch: u64, cs: &mut CommonSubset) { + fn extend_with_epoch( + &mut self, + epoch: u64, + msgs: &mut VecDeque, NodeUid>>, + ) { let convert = |msg: TargetedMessage, NodeUid>| { msg.map(|cs_msg| MessageContent::CommonSubset(cs_msg).with_epoch(epoch)) }; - self.extend(cs.message_iter().map(convert)); + self.extend(msgs.drain(..).map(convert)); } } diff --git a/src/messaging.rs b/src/messaging.rs index f5f1725..44e48e4 100644 --- a/src/messaging.rs +++ b/src/messaging.rs @@ -1,5 +1,6 @@ use std::collections::{BTreeMap, BTreeSet, VecDeque}; use std::fmt::Debug; +use std::iter::once; use crypto::{PublicKey, PublicKeySet, PublicKeyShare, SecretKey, SecretKeyShare}; use fault_log::FaultLog; @@ -52,32 +53,115 @@ impl TargetedMessage { /// Result of one step of the local state machine of a distributed algorithm. Such a result should /// be used and never discarded by the client of the algorithm. #[must_use = "The algorithm step result must be used."] -pub struct Step +pub struct Step where - N: Clone, + D: DistAlgorithm, + ::NodeUid: Clone, { - pub output: VecDeque, - pub fault_log: FaultLog, + pub output: VecDeque, + pub fault_log: FaultLog, + pub messages: VecDeque>, } -impl Default for Step +impl Default for Step where - N: Clone, + D: DistAlgorithm, + ::NodeUid: Clone, { - fn default() -> Step { + fn default() -> Step { Step { - output: Default::default(), + output: VecDeque::default(), fault_log: FaultLog::default(), + messages: VecDeque::default(), } } } -impl Step +impl Step where - N: Clone, + ::NodeUid: Clone, { - pub fn new(output: VecDeque, fault_log: FaultLog) -> Self { - Step { output, fault_log } + /// Creates a new `Step` from the given collections. + pub fn new( + output: VecDeque, + fault_log: FaultLog, + messages: VecDeque>, + ) -> Self { + Step { + output, + fault_log, + messages, + } + } + + /// Converts `self` into a step of another type, given conversion methods for output and + /// messages. + pub fn map(self, f_out: FO, f_msg: FM) -> Step + where + D2: DistAlgorithm, + FO: Fn(D::Output) -> D2::Output, + FM: Fn(D::Message) -> D2::Message, + { + Step { + output: self.output.into_iter().map(f_out).collect(), + fault_log: self.fault_log, + messages: self.messages.into_iter().map(|tm| tm.map(&f_msg)).collect(), + } + } + + /// Extends `self` with `other`s messages and fault logs, and returns `other.output`. + pub fn extend_with(&mut self, other: Step, f_msg: FM) -> VecDeque + where + D2: DistAlgorithm, + FM: Fn(D2::Message) -> D::Message, + { + self.fault_log.extend(other.fault_log); + let msgs = other.messages.into_iter().map(|tm| tm.map(&f_msg)); + self.messages.extend(msgs); + other.output + } + + /// Adds the outputs, fault logs and messages of `other` to `self`. + pub fn extend(&mut self, other: Self) { + self.output.extend(other.output); + self.fault_log.extend(other.fault_log); + self.messages.extend(other.messages); + } + + /// Converts this step into an equivalent step for a different `DistAlgorithm`. + // This cannot be a `From` impl, because it would conflict with `impl From for T`. + pub fn convert(self) -> Step + where + D2: DistAlgorithm, + { + Step { + output: self.output, + fault_log: self.fault_log, + messages: self.messages, + } + } + + /// Returns `true` if there are now messages, faults or outputs. + pub fn is_empty(&self) -> bool { + self.output.is_empty() && self.fault_log.is_empty() && self.messages.is_empty() + } +} + +impl From> for Step { + fn from(fault_log: FaultLog) -> Self { + Step { + fault_log, + ..Step::default() + } + } +} + +impl From> for Step { + fn from(msg: TargetedMessage) -> Self { + Step { + messages: once(msg).collect(), + ..Step::default() + } } } @@ -96,47 +180,24 @@ pub trait DistAlgorithm { type Error: Debug; /// Handles an input provided by the user, and returns - fn input( - &mut self, - input: Self::Input, - ) -> Result, Self::Error>; + fn input(&mut self, input: Self::Input) -> Result, Self::Error> + where + Self: Sized; /// Handles a message received from node `sender_id`. fn handle_message( &mut self, sender_id: &Self::NodeUid, message: Self::Message, - ) -> Result, Self::Error>; - - /// Returns a message that needs to be sent to another node. - fn next_message(&mut self) -> Option>; + ) -> Result, Self::Error> + where + Self: Sized; /// Returns `true` if execution has completed and this instance can be dropped. fn terminated(&self) -> bool; /// Returns this node's own ID. fn our_id(&self) -> &Self::NodeUid; - - /// Returns an iterator over the outgoing messages. - fn message_iter(&mut self) -> MessageIter - where - Self: Sized, - { - MessageIter { algorithm: self } - } -} - -/// An iterator over a distributed algorithm's outgoing messages. -pub struct MessageIter<'a, D: DistAlgorithm + 'a> { - algorithm: &'a mut D, -} - -impl<'a, D: DistAlgorithm + 'a> Iterator for MessageIter<'a, D> { - type Item = TargetedMessage; - - fn next(&mut self) -> Option { - self.algorithm.next_message() - } } /// Common data shared between algorithms: the nodes' IDs and key shares. diff --git a/src/queueing_honey_badger.rs b/src/queueing_honey_badger.rs index 9ddf73a..c6eee20 100644 --- a/src/queueing_honey_badger.rs +++ b/src/queueing_honey_badger.rs @@ -21,7 +21,6 @@ //! the same transaction multiple times. use std::cmp; -use std::collections::VecDeque; use std::fmt::Debug; use std::hash::Hash; use std::marker::PhantomData; @@ -30,8 +29,7 @@ use rand::Rand; use serde::{Deserialize, Serialize}; use dynamic_honey_badger::{self, Batch as DhbBatch, DynamicHoneyBadger, Message}; -use fault_log::FaultLog; -use messaging::{DistAlgorithm, Step, TargetedMessage}; +use messaging::{self, DistAlgorithm}; use transaction_queue::TransactionQueue; pub use dynamic_honey_badger::{Change, ChangeState, Input}; @@ -59,6 +57,8 @@ where { /// Returns a new `QueueingHoneyBadgerBuilder` configured to use the node IDs and cryptographic /// keys specified by `netinfo`. + // TODO: Make it easier to build a `QueueingHoneyBadger` with a `JoinPlan`. Handle `Step` + // conversion internally. pub fn new(dyn_hb: DynamicHoneyBadger, NodeUid>) -> Self { // TODO: Use the defaults from `HoneyBadgerBuilder`. QueueingHoneyBadgerBuilder { @@ -75,7 +75,7 @@ where } /// Creates a new Queueing Honey Badger instance with an empty buffer. - pub fn build(self) -> QueueingHoneyBadger + pub fn build(self) -> (QueueingHoneyBadger, Step) where Tx: Serialize + for<'r> Deserialize<'r> + Debug + Hash + Eq, { @@ -85,7 +85,10 @@ where /// Returns a new Queueing Honey Badger instance that starts with the given transactions in its /// buffer. - pub fn build_with_transactions(self, txs: TI) -> Result> + pub fn build_with_transactions( + self, + txs: TI, + ) -> Result<(QueueingHoneyBadger, Step)> where TI: IntoIterator, Tx: Serialize + for<'r> Deserialize<'r> + Debug + Hash + Eq, @@ -95,10 +98,9 @@ where dyn_hb: self.dyn_hb, queue, batch_size: self.batch_size, - output: VecDeque::new(), }; - let _ = qhb.propose()?; // Fault log is empty: no contact with other nodes yet. - Ok(qhb) + let step = qhb.propose()?; + Ok((qhb, step)) } } @@ -115,11 +117,9 @@ where dyn_hb: DynamicHoneyBadger, NodeUid>, /// The queue of pending transactions that haven't been output in a batch yet. queue: TransactionQueue, - /// The outputs from completed epochs. - output: VecDeque>, } -pub type QueueingHoneyBadgerStep = Step>; +pub type Step = messaging::Step>; impl DistAlgorithm for QueueingHoneyBadger where @@ -132,42 +132,32 @@ where type Message = Message; type Error = Error; - fn input(&mut self, input: Self::Input) -> Result> { + fn input(&mut self, input: Self::Input) -> Result> { // User transactions are forwarded to `HoneyBadger` right away. Internal messages are // in addition signed and broadcast. - let fault_log = match input { + match input { Input::User(tx) => { self.queue.0.push_back(tx); - FaultLog::new() + Ok(Step::default()) } - Input::Change(change) => { - let step = self.dyn_hb.input(Input::Change(change))?; - // FIXME: Use the output since `dyn_hb` can output immediately on input. - step.fault_log - } - }; - self.step(fault_log) + Input::Change(change) => Ok(self.dyn_hb.input(Input::Change(change))?.convert()), + } } fn handle_message( &mut self, sender_id: &NodeUid, message: Self::Message, - ) -> Result> { - let Step { - output, - mut fault_log, - } = self.dyn_hb.handle_message(sender_id, message)?; - for batch in output { + ) -> Result> { + let mut step = self + .dyn_hb + .handle_message(sender_id, message)? + .convert::(); + for batch in &step.output { self.queue.remove_all(batch.iter()); - self.output.push_back(batch); } - fault_log.extend(self.propose()?); - self.step(fault_log) - } - - fn next_message(&mut self) -> Option> { - self.dyn_hb.next_message() + step.extend(self.propose()?); + Ok(step) } fn terminated(&self) -> bool { @@ -192,33 +182,22 @@ where QueueingHoneyBadgerBuilder::new(dyn_hb) } - fn step( - &mut self, - fault_log: FaultLog, - ) -> Result> { - Ok(Step::new(self.output.drain(..).collect(), fault_log)) - } - /// Returns a reference to the internal `DynamicHoneyBadger` instance. pub fn dyn_hb(&self) -> &DynamicHoneyBadger, NodeUid> { &self.dyn_hb } /// Initiates the next epoch by proposing a batch from the queue. - fn propose(&mut self) -> Result> { + fn propose(&mut self) -> Result> { let amount = cmp::max(1, self.batch_size / self.dyn_hb.netinfo().num_nodes()); - // TODO: This will loop forever if we are the only validator. - let mut fault_log = FaultLog::new(); - while !self.dyn_hb.has_input() { + // TODO: This will output immediately if we are the only validator. + if self.dyn_hb.has_input() { + Ok(Step::default()) // Error? + } else { let proposal = self.queue.choose(amount, self.batch_size); let step = self.dyn_hb.input(Input::User(proposal))?; - fault_log.extend(step.fault_log); - for batch in step.output { - self.queue.remove_all(batch.iter()); - self.output.push_back(batch); - } + Ok(Step::new(step.output, step.fault_log, step.messages)) } - Ok(fault_log) } } diff --git a/tests/broadcast.rs b/tests/broadcast.rs index faf0a38..76531cb 100644 --- a/tests/broadcast.rs +++ b/tests/broadcast.rs @@ -80,8 +80,9 @@ impl Adversary> for ProposeAdversary { let netinfo = Arc::new(NetworkInfo::generate_map(node_ids).remove(&id).unwrap()); let mut bc = Broadcast::new(netinfo, id).expect("broadcast instance"); // FIXME: Use the output. - let _ = bc.input(b"Fake news".to_vec()).expect("propose"); - bc.message_iter() + let step = bc.input(b"Fake news".to_vec()).expect("propose"); + step.messages + .into_iter() .map(|msg| MessageWithSender::new(id, msg)) .collect() } diff --git a/tests/dynamic_honey_badger.rs b/tests/dynamic_honey_badger.rs index 5e6d0ba..9be87a3 100644 --- a/tests/dynamic_honey_badger.rs +++ b/tests/dynamic_honey_badger.rs @@ -127,9 +127,7 @@ where // Allow passing `netinfo` by value. `TestNetwork` expects this function signature. #[cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))] fn new_dynamic_hb(netinfo: Arc>) -> UsizeDhb { - DynamicHoneyBadger::builder((*netinfo).clone()) - .build() - .expect("instantiate DHB") + DynamicHoneyBadger::builder().build((*netinfo).clone()) } fn test_dynamic_honey_badger_different_sizes(new_adversary: F, num_txs: usize) diff --git a/tests/network/mod.rs b/tests/network/mod.rs index 8935ef5..159409c 100644 --- a/tests/network/mod.rs +++ b/tests/network/mod.rs @@ -6,7 +6,7 @@ use std::sync::Arc; use rand::{self, Rng}; use hbbft::crypto::SecretKeyShare; -use hbbft::messaging::{DistAlgorithm, NetworkInfo, Target, TargetedMessage}; +use hbbft::messaging::{DistAlgorithm, NetworkInfo, Step, Target, TargetedMessage}; /// A node identifier. In the tests, nodes are simply numbered. #[derive(Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Clone, Copy, Serialize, Deserialize, Rand)] @@ -22,6 +22,8 @@ pub struct TestNode { pub queue: VecDeque<(D::NodeUid, D::Message)>, /// The values this node has output so far. outputs: Vec, + /// Outgoing messages to be sent to other nodes. + messages: VecDeque>, } impl TestNode { @@ -40,6 +42,7 @@ impl TestNode { pub fn input(&mut self, input: D::Input) { let step = self.algo.input(input).expect("input"); self.outputs.extend(step.output); + self.messages.extend(step.messages); } /// Returns the internal algorithm's instance. @@ -49,12 +52,13 @@ impl TestNode { } /// Creates a new test node with the given broadcast instance. - fn new(algo: D) -> TestNode { + fn new((algo, step): (D, Step)) -> TestNode { TestNode { id: algo.our_id().clone(), algo, queue: VecDeque::new(), - outputs: Vec::new(), + outputs: step.output.into_iter().collect(), + messages: step.messages, } } @@ -67,6 +71,7 @@ impl TestNode { .handle_message(&from_id, msg) .expect("handling message"); self.outputs.extend(step.output); + self.messages.extend(step.messages); } /// Checks whether the node has messages to process @@ -364,6 +369,7 @@ where { /// Creates a new network with `good_num` good nodes, and the given `adversary` controlling /// `adv_num` nodes. + #[allow(unused)] // Not used in all tests. pub fn new( good_num: usize, adv_num: usize, @@ -373,6 +379,23 @@ where where F: Fn(Arc>) -> D, G: Fn(BTreeMap>>) -> A, + { + Self::new_with_step(good_num, adv_num, adversary, |netinfo| { + (new_algo(netinfo), Step::default()) + }) + } + + /// Creates a new network with `good_num` good nodes, and the given `adversary` controlling + /// `adv_num` nodes. + pub fn new_with_step( + good_num: usize, + adv_num: usize, + adversary: G, + new_algo: F, + ) -> TestNetwork + where + F: Fn(Arc>) -> (D, Step), + G: Fn(BTreeMap>>) -> A, { let mut rng = rand::thread_rng(); let mut netinfos = NetworkInfo::generate_map((0..(good_num + adv_num)).map(NodeUid)); @@ -412,7 +435,7 @@ where } let mut initial_msgs: Vec<(D::NodeUid, Vec<_>)> = Vec::new(); for (id, node) in &mut network.nodes { - initial_msgs.push((*id, node.algo.message_iter().collect())); + initial_msgs.push((*id, node.messages.drain(..).collect())); } for (id, msgs) in initial_msgs { network.dispatch_messages(id, msgs); @@ -476,7 +499,7 @@ where // The node handles the incoming message and creates new outgoing ones to be dispatched. let msgs: Vec<_> = { - let node = self.nodes.get_mut(&id).unwrap(); + let mut node = self.nodes.get_mut(&id).unwrap(); // Ensure the adversary is playing fair by selecting a node that will result in actual // progress being made, otherwise `TestNode::handle_message()` will panic on `expect()` @@ -487,7 +510,7 @@ where ); node.handle_message(); - node.algo.message_iter().collect() + node.messages.drain(..).collect() }; self.dispatch_messages(id, msgs); @@ -497,9 +520,9 @@ where /// Inputs a value in node `id`. pub fn input(&mut self, id: NodeUid, value: D::Input) { let msgs: Vec<_> = { - let node = self.nodes.get_mut(&id).expect("input instance"); + let mut node = self.nodes.get_mut(&id).expect("input instance"); node.input(value); - node.algo.message_iter().collect() + node.messages.drain(..).collect() }; self.dispatch_messages(id, msgs); } diff --git a/tests/queueing_honey_badger.rs b/tests/queueing_honey_badger.rs index 2689cb7..7853eb3 100644 --- a/tests/queueing_honey_badger.rs +++ b/tests/queueing_honey_badger.rs @@ -21,7 +21,7 @@ use std::sync::Arc; use hbbft::dynamic_honey_badger::DynamicHoneyBadger; use hbbft::messaging::NetworkInfo; -use hbbft::queueing_honey_badger::{Batch, Change, ChangeState, Input, QueueingHoneyBadger}; +use hbbft::queueing_honey_badger::{Batch, Change, ChangeState, Input, QueueingHoneyBadger, Step}; use itertools::Itertools; use rand::Rng; @@ -107,10 +107,10 @@ where // Allow passing `netinfo` by value. `TestNetwork` expects this function signature. #[cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))] -fn new_queueing_hb(netinfo: Arc>) -> QueueingHoneyBadger { - let dyn_hb = DynamicHoneyBadger::builder((*netinfo).clone()) - .build() - .expect("instantiate DHB"); +fn new_queueing_hb( + netinfo: Arc>, +) -> (QueueingHoneyBadger, Step) { + let dyn_hb = DynamicHoneyBadger::builder().build((*netinfo).clone()); QueueingHoneyBadger::builder(dyn_hb).batch_size(3).build() } @@ -133,7 +133,8 @@ where num_good_nodes, num_adv_nodes ); let adversary = |adv_nodes| new_adversary(num_good_nodes, num_adv_nodes, adv_nodes); - let network = TestNetwork::new(num_good_nodes, num_adv_nodes, adversary, new_queueing_hb); + let network = + TestNetwork::new_with_step(num_good_nodes, num_adv_nodes, adversary, new_queueing_hb); test_queueing_honey_badger(network, num_txs); } }