From 65b3097238f8349b93c936ace70774d0318c6b0d Mon Sep 17 00:00:00 2001 From: Vladimir Komendantskiy Date: Wed, 18 Jul 2018 13:15:47 +0100 Subject: [PATCH] message queue refactoring WIP --- examples/network/node.rs | 6 ++-- examples/simulation.rs | 12 ++++--- src/agreement/mod.rs | 32 +++++++----------- src/broadcast.rs | 7 ++-- src/common_coin.rs | 15 ++++----- src/common_subset.rs | 38 ++++++++++++--------- src/dynamic_honey_badger/mod.rs | 29 ++++++++-------- src/honey_badger.rs | 59 ++++++++++++++++++--------------- src/messaging.rs | 54 ++++++++++++------------------ src/queueing_honey_badger.rs | 40 ++++++++++++++-------- tests/broadcast.rs | 5 +-- tests/network/mod.rs | 15 ++++++--- 12 files changed, 162 insertions(+), 150 deletions(-) diff --git a/examples/network/node.rs b/examples/network/node.rs index f2865f4..ce5a359 100644 --- a/examples/network/node.rs +++ b/examples/network/node.rs @@ -144,8 +144,8 @@ impl + PartialEq + Send + Sync + From> + if let Some(v) = value { // FIXME: Use the output. - let _ = broadcast.input(v.clone().into()).expect("propose value"); - for msg in broadcast.message_iter() { + let step = broadcast.input(v.clone().into()).expect("propose value"); + for msg in step.messages { tx_from_algo.send(msg).expect("send from algo"); } } @@ -158,7 +158,7 @@ impl + PartialEq + Send + Sync + From> + let step = broadcast .handle_message(&i, message) .expect("handle broadcast message"); - for msg in broadcast.message_iter() { + for msg in step.messages { debug!("{} sending to {:?}: {:?}", our_id, msg.target, msg.message); tx_from_algo.send(msg).expect("send from algo"); } diff --git a/examples/simulation.rs b/examples/simulation.rs index f51dd7b..92261e2 100644 --- a/examples/simulation.rs +++ b/examples/simulation.rs @@ -178,12 +178,16 @@ where /// Handles the algorithm's output and messages. fn send_output_and_msgs( &mut self, - step: Step<::NodeUid, ::Output>, + step: Step< + ::NodeUid, + ::Output, + ::Message, + >, ) { let start = Instant::now(); - let out_msgs: Vec<_> = self - .algo - .message_iter() + let out_msgs: Vec<_> = step + .messages + .into_iter() .map(|msg| { ( msg.target, diff --git a/src/agreement/mod.rs b/src/agreement/mod.rs index bd3f623..dc251a5 100644 --- a/src/agreement/mod.rs +++ b/src/agreement/mod.rs @@ -208,7 +208,7 @@ pub struct Agreement { coin_schedule: CoinSchedule, } -pub type AgreementStep = Step; +pub type AgreementStep = Step; impl DistAlgorithm for Agreement { type NodeUid = NodeUid; @@ -247,13 +247,6 @@ impl DistAlgorithm for Agreement { self.step(fault_log) } - /// Take the next Agreement message for multicast to all other nodes. - fn next_message(&mut self) -> Option> { - self.messages - .pop_front() - .map(|msg| Target::All.message(msg)) - } - /// Whether the algorithm has terminated. fn terminated(&self) -> bool { self.terminated @@ -305,6 +298,10 @@ impl Agreement { Ok(Step::new( self.output.take().into_iter().collect(), fault_log, + self.messages + .drain(..) + .map(|msg| Target::All.message(msg)) + .collect(), )) } @@ -494,7 +491,6 @@ impl Agreement { msg: CommonCoinMessage, ) -> AgreementResult> { let coin_step = self.common_coin.handle_message(sender_id, msg)?; - self.extend_common_coin(); self.on_coin_step(coin_step) } @@ -505,7 +501,14 @@ impl Agreement { let Step { output, mut fault_log, + messages, } = coin_step; + let epoch = self.epoch; + self.messages.extend(messages.into_iter().map( + |msg: TargetedMessage| { + AgreementContent::Coin(Box::new(msg.message)).with_epoch(epoch) + }, + )); if let Some(coin) = output.into_iter().next() { let def_bin_value = self.count_conf().1.definite(); fault_log.extend(self.on_coin(coin, def_bin_value)?); @@ -562,16 +565,6 @@ impl Agreement { } } - /// Propagates Common Coin messages to the top level. - fn extend_common_coin(&mut self) { - let epoch = self.epoch; - self.messages.extend(self.common_coin.message_iter().map( - |msg: TargetedMessage| { - AgreementContent::Coin(Box::new(msg.message)).with_epoch(epoch) - }, - )); - } - /// Decides on a value and broadcasts a `Term` message with that value. fn decide(&mut self, b: bool) { if self.terminated { @@ -602,7 +595,6 @@ impl Agreement { { // Invoke the common coin. let coin_step = self.common_coin.input(())?; - self.extend_common_coin(); self.on_coin_step(coin_step) } else { // Continue waiting for (N - f) `Conf` messages diff --git a/src/broadcast.rs b/src/broadcast.rs index c5554cd..9f2ceb1 100644 --- a/src/broadcast.rs +++ b/src/broadcast.rs @@ -228,7 +228,7 @@ pub struct Broadcast { output: Option>, } -pub type BroadcastStep = Step>; +pub type BroadcastStep = Step, BroadcastMessage>; impl DistAlgorithm for Broadcast { type NodeUid = NodeUid; @@ -270,10 +270,6 @@ impl DistAlgorithm for Broadcast { self.step(fault_log) } - fn next_message(&mut self) -> Option> { - self.messages.pop_front() - } - fn terminated(&self) -> bool { self.decided } @@ -310,6 +306,7 @@ impl Broadcast { Ok(Step::new( self.output.take().into_iter().collect(), fault_log, + self.messages.drain(..).collect(), )) } diff --git a/src/common_coin.rs b/src/common_coin.rs index 56cd9ce..c11018f 100644 --- a/src/common_coin.rs +++ b/src/common_coin.rs @@ -28,7 +28,7 @@ use std::sync::Arc; use crypto::error as cerror; use crypto::{Signature, SignatureShare}; use fault_log::{FaultKind, FaultLog}; -use messaging::{DistAlgorithm, NetworkInfo, Step, Target, TargetedMessage}; +use messaging::{DistAlgorithm, NetworkInfo, Step, Target}; error_chain! { links { @@ -78,7 +78,7 @@ pub struct CommonCoin { terminated: bool, } -pub type CommonCoinStep = Step; +pub type CommonCoinStep = Step; impl DistAlgorithm for CommonCoin where @@ -117,13 +117,6 @@ where self.step(fault_log) } - /// Takes the next share of a threshold signature message for multicasting to all other nodes. - fn next_message(&mut self) -> Option> { - self.messages - .pop_front() - .map(|msg| Target::All.message(msg)) - } - /// Whether the algorithm has terminated. fn terminated(&self) -> bool { self.terminated @@ -155,6 +148,10 @@ where Ok(Step::new( self.output.take().into_iter().collect(), fault_log, + self.messages + .drain(..) + .map(|msg| Target::All.message(msg)) + .collect(), )) } diff --git a/src/common_subset.rs b/src/common_subset.rs index 475521b..77cd336 100644 --- a/src/common_subset.rs +++ b/src/common_subset.rs @@ -70,19 +70,27 @@ struct MessageQueue(VecDeque, No impl MessageQueue { /// Appends to the queue the messages from `agr`, wrapped with `proposer_id`. - fn extend_agreement(&mut self, proposer_id: &NodeUid, agr: &mut Agreement) { + fn extend_agreement( + &mut self, + proposer_id: &NodeUid, + msgs: &mut VecDeque>, + ) { let convert = |msg: TargetedMessage| { msg.map(|a_msg| Message::Agreement(proposer_id.clone(), a_msg)) }; - self.extend(agr.message_iter().map(convert)); + self.extend(msgs.drain(..).map(convert)); } /// Appends to the queue the messages from `bc`, wrapped with `proposer_id`. - fn extend_broadcast(&mut self, proposer_id: &NodeUid, bc: &mut Broadcast) { + fn extend_broadcast( + &mut self, + proposer_id: &NodeUid, + msgs: &mut VecDeque>, + ) { let convert = |msg: TargetedMessage| { msg.map(|b_msg| Message::Broadcast(proposer_id.clone(), b_msg)) }; - self.extend(bc.message_iter().map(convert)); + self.extend(msgs.drain(..).map(convert)); } } @@ -102,7 +110,8 @@ pub struct CommonSubset { decided: bool, } -pub type CommonSubsetStep = Step>; +pub type CommonSubsetStep = + Step, Message>; impl DistAlgorithm for CommonSubset { type NodeUid = NodeUid; @@ -133,10 +142,6 @@ impl DistAlgorithm for CommonSubset Option> { - self.messages.pop_front() - } - fn terminated(&self) -> bool { self.messages.is_empty() && self.agreement_instances.values().all(Agreement::terminated) } @@ -185,6 +190,7 @@ impl CommonSubset { Ok(Step::new( self.output.take().into_iter().collect(), fault_log, + self.messages.drain(..).collect(), )) } @@ -243,9 +249,10 @@ impl CommonSubset { .broadcast_instances .get_mut(proposer_id) .ok_or(ErrorKind::NoSuchBroadcastInstance)?; - let step = f(broadcast)?; + let mut step = f(broadcast)?; fault_log.extend(step.fault_log); - self.messages.extend_broadcast(&proposer_id, broadcast); + self.messages + .extend_broadcast(&proposer_id, &mut step.messages); if let Some(output) = step.output.into_iter().next() { output } else { @@ -284,9 +291,10 @@ impl CommonSubset { if agreement.terminated() { return Ok(fault_log); } - let step = f(agreement)?; + let mut step = f(agreement)?; fault_log.extend(step.fault_log); - self.messages.extend_agreement(proposer_id, agreement); + self.messages + .extend_agreement(proposer_id, &mut step.messages); if let Some(output) = step.output.into_iter().next() { output } else { @@ -311,9 +319,9 @@ impl CommonSubset { // input 0 to each instance of BA that has not yet been provided input. for (uid, agreement) in &mut self.agreement_instances { if agreement.accepts_input() { - let step = agreement.input(false)?; + let mut step = agreement.input(false)?; fault_log.extend(step.fault_log); - self.messages.extend_agreement(uid, agreement); + self.messages.extend_agreement(uid, &mut step.messages); if let Some(output) = step.output.into_iter().next() { if self.agreement_results.insert(uid.clone(), output).is_some() { return Err(ErrorKind::MultipleAgreementResults.into()); diff --git a/src/dynamic_honey_badger/mod.rs b/src/dynamic_honey_badger/mod.rs index 5e8b468..fbcb2d6 100644 --- a/src/dynamic_honey_badger/mod.rs +++ b/src/dynamic_honey_badger/mod.rs @@ -116,7 +116,7 @@ pub struct DynamicHoneyBadger { output: VecDeque>, } -pub type DynamicHoneyBadgerStep = Step>; +pub type DynamicHoneyBadgerStep = Step, Message>; impl DistAlgorithm for DynamicHoneyBadger where @@ -169,10 +169,6 @@ where self.step(fault_log) } - fn next_message(&mut self) -> Option> { - self.messages.pop_front() - } - fn terminated(&self) -> bool { false } @@ -188,7 +184,11 @@ where NodeUid: Eq + Ord + Clone + Debug + Serialize + for<'r> Deserialize<'r> + Hash + Rand, { fn step(&mut self, fault_log: FaultLog) -> Result> { - Ok(Step::new(self.output.drain(..).collect(), fault_log)) + Ok(Step::new( + self.output.drain(..).collect(), + fault_log, + self.messages.drain(..).collect(), + )) } /// Returns a new `DynamicHoneyBadgerBuilder` configured to use the node IDs and cryptographic @@ -276,7 +276,7 @@ where /// Processes all pending batches output by Honey Badger. fn process_output( &mut self, - step: HoneyBadgerStep, NodeUid>, + mut step: HoneyBadgerStep, NodeUid>, ) -> Result> { let mut fault_log = FaultLog::new(); fault_log.extend(step.fault_log); @@ -329,7 +329,7 @@ where self.output.push_back(batch); } self.messages - .extend_with_epoch(self.start_epoch, &mut self.honey_badger); + .extend_with_epoch(self.start_epoch, &mut step.messages); // If `start_epoch` changed, we can now handle some queued messages. if start_epoch < self.start_epoch { let queue = mem::replace(&mut self.incoming_queue, Vec::new()); @@ -375,7 +375,7 @@ where fn restart_honey_badger(&mut self, epoch: u64) { // TODO: Filter out the messages for `epoch` and later. self.messages - .extend_with_epoch(self.start_epoch, &mut self.honey_badger); + .extend_with_epoch(self.start_epoch, &mut self.honey_badger.messages.0); self.start_epoch = epoch; self.key_gen_msg_buffer.retain(|kg_msg| kg_msg.0 >= epoch); let netinfo = Arc::new(self.netinfo.clone()); @@ -529,14 +529,15 @@ where NodeUid: Eq + Hash + Ord + Clone + Debug + Serialize + for<'r> Deserialize<'r> + Rand, { /// Appends to the queue the messages from `hb`, wrapped with `epoch`. - fn extend_with_epoch(&mut self, epoch: u64, hb: &mut HoneyBadger) - where - Tx: Eq + Serialize + for<'r> Deserialize<'r> + Debug + Hash, - { + fn extend_with_epoch( + &mut self, + epoch: u64, + msgs: &mut VecDeque, NodeUid>>, + ) { let convert = |msg: TargetedMessage, NodeUid>| { msg.map(|hb_msg| Message::HoneyBadger(epoch, hb_msg)) }; - self.extend(hb.message_iter().map(convert)); + self.extend(msgs.drain(..).map(convert)); } } diff --git a/src/honey_badger.rs b/src/honey_badger.rs index 4394e38..b9a3604 100644 --- a/src/honey_badger.rs +++ b/src/honey_badger.rs @@ -119,7 +119,7 @@ pub struct HoneyBadger { /// The maximum number of `CommonSubset` instances that we run simultaneously. max_future_epochs: u64, /// The messages that need to be sent to other nodes. - messages: MessageQueue, + pub(crate) messages: MessageQueue, /// The outputs from completed epochs. output: Vec>, /// Messages for future epochs that couldn't be handled yet. @@ -134,7 +134,7 @@ pub struct HoneyBadger { ciphertexts: BTreeMap>, } -pub type HoneyBadgerStep = Step>; +pub type HoneyBadgerStep = Step, Message>; impl DistAlgorithm for HoneyBadger where @@ -174,10 +174,6 @@ where self.step(fault_log) } - fn next_message(&mut self) -> Option> { - self.messages.pop_front() - } - fn terminated(&self) -> bool { false } @@ -202,7 +198,11 @@ where &mut self, fault_log: FaultLog, ) -> HoneyBadgerResult> { - Ok(Step::new(self.output.drain(..).collect(), fault_log)) + Ok(Step::new( + self.output.drain(..).collect(), + fault_log, + self.messages.drain(..).collect(), + )) } /// Proposes a new item in the current epoch. @@ -220,11 +220,9 @@ where let ser_prop = bincode::serialize(&proposal)?; let ciphertext = self.netinfo.public_key_set().public_key().encrypt(ser_prop); self.has_input = true; - let step = cs.input(bincode::serialize(&ciphertext).unwrap())?; - self.messages.extend_with_epoch(self.epoch, cs); - step + cs.input(bincode::serialize(&ciphertext).unwrap())? }; - Ok(self.process_output(step)?) + Ok(self.process_output(step, None)?) } /// Returns `true` if input for the current epoch has already been provided. @@ -270,15 +268,9 @@ where } } }; - // Handle the message and put the outgoing messages into the queue. - let cs_step = cs.handle_message(sender_id, message)?; - self.messages.extend_with_epoch(epoch, cs); - cs_step + cs.handle_message(sender_id, message)? }; - // If this is the current epoch, the message could cause a new output. - if epoch == self.epoch { - fault_log.extend(self.process_output(step)?); - } + fault_log.extend(self.process_output(step, Some(epoch))?); self.remove_terminated(epoch); Ok(fault_log) } @@ -576,18 +568,27 @@ where } } - /// Checks whether the current epoch has output, and if it does, sends out our decryption shares. + /// Checks whether the current epoch has output, and if it does, sends out our decryption + /// shares. The `epoch` argument allows to differentiate between calls which produce output in + /// all conditions, `epoch == None`, and calls which only produce output in a given epoch, + /// `epoch == Some(given_epoch)`. fn process_output( &mut self, step: CommonSubsetStep, + epoch: Option, ) -> HoneyBadgerResult> { let Step { output, mut fault_log, + mut messages, } = step; - for cs_output in output { - fault_log.extend(self.send_decryption_shares(cs_output)?); - // TODO: May also check that there is no further output from Common Subset. + self.messages.extend_with_epoch(self.epoch, &mut messages); + // If this is the current epoch, the message could cause a new output. + if epoch.is_none() || epoch == Some(self.epoch) { + for cs_output in output { + fault_log.extend(self.send_decryption_shares(cs_output)?); + // TODO: May also check that there is no further output from Common Subset. + } } Ok(fault_log) } @@ -695,14 +696,20 @@ impl Message { /// The queue of outgoing messages in a `HoneyBadger` instance. #[derive(Deref, DerefMut)] -struct MessageQueue(VecDeque, NodeUid>>); +pub(crate) struct MessageQueue( + pub(crate) VecDeque, NodeUid>>, +); impl MessageQueue { /// Appends to the queue the messages from `cs`, wrapped with `epoch`. - fn extend_with_epoch(&mut self, epoch: u64, cs: &mut CommonSubset) { + fn extend_with_epoch( + &mut self, + epoch: u64, + msgs: &mut VecDeque, NodeUid>>, + ) { let convert = |msg: TargetedMessage, NodeUid>| { msg.map(|cs_msg| MessageContent::CommonSubset(cs_msg).with_epoch(epoch)) }; - self.extend(cs.message_iter().map(convert)); + self.extend(msgs.drain(..).map(convert)); } } diff --git a/src/messaging.rs b/src/messaging.rs index f5f1725..5b05a1e 100644 --- a/src/messaging.rs +++ b/src/messaging.rs @@ -52,35 +52,47 @@ impl TargetedMessage { /// Result of one step of the local state machine of a distributed algorithm. Such a result should /// be used and never discarded by the client of the algorithm. #[must_use = "The algorithm step result must be used."] -pub struct Step +pub struct Step where N: Clone, { pub output: VecDeque, pub fault_log: FaultLog, + pub messages: VecDeque>, } -impl Default for Step +impl Default for Step where N: Clone, { - fn default() -> Step { + fn default() -> Step { Step { - output: Default::default(), + output: VecDeque::default(), fault_log: FaultLog::default(), + messages: VecDeque::default(), } } } -impl Step +impl Step where N: Clone, { - pub fn new(output: VecDeque, fault_log: FaultLog) -> Self { - Step { output, fault_log } + pub fn new( + output: VecDeque, + fault_log: FaultLog, + messages: VecDeque>, + ) -> Self { + Step { + output, + fault_log, + messages, + } } } +type StepResult = Result, E>; + /// A distributed algorithm that defines a message flow. pub trait DistAlgorithm { /// Unique node identifier. @@ -99,44 +111,20 @@ pub trait DistAlgorithm { fn input( &mut self, input: Self::Input, - ) -> Result, Self::Error>; + ) -> StepResult; /// Handles a message received from node `sender_id`. fn handle_message( &mut self, sender_id: &Self::NodeUid, message: Self::Message, - ) -> Result, Self::Error>; - - /// Returns a message that needs to be sent to another node. - fn next_message(&mut self) -> Option>; + ) -> StepResult; /// Returns `true` if execution has completed and this instance can be dropped. fn terminated(&self) -> bool; /// Returns this node's own ID. fn our_id(&self) -> &Self::NodeUid; - - /// Returns an iterator over the outgoing messages. - fn message_iter(&mut self) -> MessageIter - where - Self: Sized, - { - MessageIter { algorithm: self } - } -} - -/// An iterator over a distributed algorithm's outgoing messages. -pub struct MessageIter<'a, D: DistAlgorithm + 'a> { - algorithm: &'a mut D, -} - -impl<'a, D: DistAlgorithm + 'a> Iterator for MessageIter<'a, D> { - type Item = TargetedMessage; - - fn next(&mut self) -> Option { - self.algorithm.next_message() - } } /// Common data shared between algorithms: the nodes' IDs and key shares. diff --git a/src/queueing_honey_badger.rs b/src/queueing_honey_badger.rs index 9ddf73a..6c35c47 100644 --- a/src/queueing_honey_badger.rs +++ b/src/queueing_honey_badger.rs @@ -119,7 +119,7 @@ where output: VecDeque>, } -pub type QueueingHoneyBadgerStep = Step>; +pub type QueueingHoneyBadgerStep = Step, Message>; impl DistAlgorithm for QueueingHoneyBadger where @@ -135,18 +135,18 @@ where fn input(&mut self, input: Self::Input) -> Result> { // User transactions are forwarded to `HoneyBadger` right away. Internal messages are // in addition signed and broadcast. - let fault_log = match input { + let (fault_log, messages) = match input { Input::User(tx) => { self.queue.0.push_back(tx); - FaultLog::new() + (FaultLog::new(), VecDeque::new()) } Input::Change(change) => { let step = self.dyn_hb.input(Input::Change(change))?; // FIXME: Use the output since `dyn_hb` can output immediately on input. - step.fault_log + (step.fault_log, step.messages) } }; - self.step(fault_log) + self.step(fault_log, messages) } fn handle_message( @@ -157,17 +157,16 @@ where let Step { output, mut fault_log, + mut messages, } = self.dyn_hb.handle_message(sender_id, message)?; for batch in output { self.queue.remove_all(batch.iter()); self.output.push_back(batch); } - fault_log.extend(self.propose()?); - self.step(fault_log) - } - - fn next_message(&mut self) -> Option> { - self.dyn_hb.next_message() + let (propose_fault_log, propose_messages) = self.propose()?; + fault_log.extend(propose_fault_log); + messages.extend(propose_messages); + self.step(fault_log, messages) } fn terminated(&self) -> bool { @@ -195,8 +194,14 @@ where fn step( &mut self, fault_log: FaultLog, + messages: VecDeque, NodeUid>>, ) -> Result> { - Ok(Step::new(self.output.drain(..).collect(), fault_log)) + Ok(Step::new( + self.output.drain(..).collect(), + fault_log, + messages, + //self.dyn_hb.messages.drain(..).collect(), + )) } /// Returns a reference to the internal `DynamicHoneyBadger` instance. @@ -205,20 +210,27 @@ where } /// Initiates the next epoch by proposing a batch from the queue. - fn propose(&mut self) -> Result> { + fn propose( + &mut self, + ) -> Result<( + FaultLog, + VecDeque, NodeUid>>, + )> { let amount = cmp::max(1, self.batch_size / self.dyn_hb.netinfo().num_nodes()); // TODO: This will loop forever if we are the only validator. let mut fault_log = FaultLog::new(); + let mut messages = VecDeque::new(); while !self.dyn_hb.has_input() { let proposal = self.queue.choose(amount, self.batch_size); let step = self.dyn_hb.input(Input::User(proposal))?; fault_log.extend(step.fault_log); + messages.extend(step.messages); for batch in step.output { self.queue.remove_all(batch.iter()); self.output.push_back(batch); } } - Ok(fault_log) + Ok((fault_log, messages)) } } diff --git a/tests/broadcast.rs b/tests/broadcast.rs index faf0a38..76531cb 100644 --- a/tests/broadcast.rs +++ b/tests/broadcast.rs @@ -80,8 +80,9 @@ impl Adversary> for ProposeAdversary { let netinfo = Arc::new(NetworkInfo::generate_map(node_ids).remove(&id).unwrap()); let mut bc = Broadcast::new(netinfo, id).expect("broadcast instance"); // FIXME: Use the output. - let _ = bc.input(b"Fake news".to_vec()).expect("propose"); - bc.message_iter() + let step = bc.input(b"Fake news".to_vec()).expect("propose"); + step.messages + .into_iter() .map(|msg| MessageWithSender::new(id, msg)) .collect() } diff --git a/tests/network/mod.rs b/tests/network/mod.rs index 8935ef5..9a5886e 100644 --- a/tests/network/mod.rs +++ b/tests/network/mod.rs @@ -22,6 +22,8 @@ pub struct TestNode { pub queue: VecDeque<(D::NodeUid, D::Message)>, /// The values this node has output so far. outputs: Vec, + /// Outgoing messages to be sent to other nodes. + messages: VecDeque>, } impl TestNode { @@ -40,6 +42,7 @@ impl TestNode { pub fn input(&mut self, input: D::Input) { let step = self.algo.input(input).expect("input"); self.outputs.extend(step.output); + self.messages.extend(step.messages); } /// Returns the internal algorithm's instance. @@ -55,6 +58,7 @@ impl TestNode { algo, queue: VecDeque::new(), outputs: Vec::new(), + messages: VecDeque::new(), } } @@ -67,6 +71,7 @@ impl TestNode { .handle_message(&from_id, msg) .expect("handling message"); self.outputs.extend(step.output); + self.messages.extend(step.messages); } /// Checks whether the node has messages to process @@ -412,7 +417,7 @@ where } let mut initial_msgs: Vec<(D::NodeUid, Vec<_>)> = Vec::new(); for (id, node) in &mut network.nodes { - initial_msgs.push((*id, node.algo.message_iter().collect())); + initial_msgs.push((*id, node.messages.drain(..).collect())); } for (id, msgs) in initial_msgs { network.dispatch_messages(id, msgs); @@ -476,7 +481,7 @@ where // The node handles the incoming message and creates new outgoing ones to be dispatched. let msgs: Vec<_> = { - let node = self.nodes.get_mut(&id).unwrap(); + let mut node = self.nodes.get_mut(&id).unwrap(); // Ensure the adversary is playing fair by selecting a node that will result in actual // progress being made, otherwise `TestNode::handle_message()` will panic on `expect()` @@ -487,7 +492,7 @@ where ); node.handle_message(); - node.algo.message_iter().collect() + node.messages.drain(..).collect() }; self.dispatch_messages(id, msgs); @@ -497,9 +502,9 @@ where /// Inputs a value in node `id`. pub fn input(&mut self, id: NodeUid, value: D::Input) { let msgs: Vec<_> = { - let node = self.nodes.get_mut(&id).expect("input instance"); + let mut node = self.nodes.get_mut(&id).expect("input instance"); node.input(value); - node.algo.message_iter().collect() + node.messages.drain(..).collect() }; self.dispatch_messages(id, msgs); }