message queue refactoring WIP

This commit is contained in:
Vladimir Komendantskiy 2018-07-18 13:15:47 +01:00
parent 9488d3f936
commit 65b3097238
12 changed files with 162 additions and 150 deletions

View File

@ -144,8 +144,8 @@ impl<T: Clone + Debug + AsRef<[u8]> + PartialEq + Send + Sync + From<Vec<u8>> +
if let Some(v) = value {
// FIXME: Use the output.
let _ = broadcast.input(v.clone().into()).expect("propose value");
for msg in broadcast.message_iter() {
let step = broadcast.input(v.clone().into()).expect("propose value");
for msg in step.messages {
tx_from_algo.send(msg).expect("send from algo");
}
}
@ -158,7 +158,7 @@ impl<T: Clone + Debug + AsRef<[u8]> + PartialEq + Send + Sync + From<Vec<u8>> +
let step = broadcast
.handle_message(&i, message)
.expect("handle broadcast message");
for msg in broadcast.message_iter() {
for msg in step.messages {
debug!("{} sending to {:?}: {:?}", our_id, msg.target, msg.message);
tx_from_algo.send(msg).expect("send from algo");
}

View File

@ -178,12 +178,16 @@ where
/// Handles the algorithm's output and messages.
fn send_output_and_msgs(
&mut self,
step: Step<<D as DistAlgorithm>::NodeUid, <D as DistAlgorithm>::Output>,
step: Step<
<D as DistAlgorithm>::NodeUid,
<D as DistAlgorithm>::Output,
<D as DistAlgorithm>::Message,
>,
) {
let start = Instant::now();
let out_msgs: Vec<_> = self
.algo
.message_iter()
let out_msgs: Vec<_> = step
.messages
.into_iter()
.map(|msg| {
(
msg.target,

View File

@ -208,7 +208,7 @@ pub struct Agreement<NodeUid> {
coin_schedule: CoinSchedule,
}
pub type AgreementStep<NodeUid> = Step<NodeUid, bool>;
pub type AgreementStep<N> = Step<N, bool, AgreementMessage>;
impl<NodeUid: Clone + Debug + Ord> DistAlgorithm for Agreement<NodeUid> {
type NodeUid = NodeUid;
@ -247,13 +247,6 @@ impl<NodeUid: Clone + Debug + Ord> DistAlgorithm for Agreement<NodeUid> {
self.step(fault_log)
}
/// Take the next Agreement message for multicast to all other nodes.
fn next_message(&mut self) -> Option<TargetedMessage<Self::Message, Self::NodeUid>> {
self.messages
.pop_front()
.map(|msg| Target::All.message(msg))
}
/// Whether the algorithm has terminated.
fn terminated(&self) -> bool {
self.terminated
@ -305,6 +298,10 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
Ok(Step::new(
self.output.take().into_iter().collect(),
fault_log,
self.messages
.drain(..)
.map(|msg| Target::All.message(msg))
.collect(),
))
}
@ -494,7 +491,6 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
msg: CommonCoinMessage,
) -> AgreementResult<FaultLog<NodeUid>> {
let coin_step = self.common_coin.handle_message(sender_id, msg)?;
self.extend_common_coin();
self.on_coin_step(coin_step)
}
@ -505,7 +501,14 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
let Step {
output,
mut fault_log,
messages,
} = coin_step;
let epoch = self.epoch;
self.messages.extend(messages.into_iter().map(
|msg: TargetedMessage<CommonCoinMessage, NodeUid>| {
AgreementContent::Coin(Box::new(msg.message)).with_epoch(epoch)
},
));
if let Some(coin) = output.into_iter().next() {
let def_bin_value = self.count_conf().1.definite();
fault_log.extend(self.on_coin(coin, def_bin_value)?);
@ -562,16 +565,6 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
}
}
/// Propagates Common Coin messages to the top level.
fn extend_common_coin(&mut self) {
let epoch = self.epoch;
self.messages.extend(self.common_coin.message_iter().map(
|msg: TargetedMessage<CommonCoinMessage, NodeUid>| {
AgreementContent::Coin(Box::new(msg.message)).with_epoch(epoch)
},
));
}
/// Decides on a value and broadcasts a `Term` message with that value.
fn decide(&mut self, b: bool) {
if self.terminated {
@ -602,7 +595,6 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
{
// Invoke the common coin.
let coin_step = self.common_coin.input(())?;
self.extend_common_coin();
self.on_coin_step(coin_step)
} else {
// Continue waiting for (N - f) `Conf` messages

View File

@ -228,7 +228,7 @@ pub struct Broadcast<NodeUid> {
output: Option<Vec<u8>>,
}
pub type BroadcastStep<N> = Step<N, Vec<u8>>;
pub type BroadcastStep<N> = Step<N, Vec<u8>, BroadcastMessage>;
impl<NodeUid: Debug + Clone + Ord> DistAlgorithm for Broadcast<NodeUid> {
type NodeUid = NodeUid;
@ -270,10 +270,6 @@ impl<NodeUid: Debug + Clone + Ord> DistAlgorithm for Broadcast<NodeUid> {
self.step(fault_log)
}
fn next_message(&mut self) -> Option<TargetedMessage<Self::Message, NodeUid>> {
self.messages.pop_front()
}
fn terminated(&self) -> bool {
self.decided
}
@ -310,6 +306,7 @@ impl<NodeUid: Debug + Clone + Ord> Broadcast<NodeUid> {
Ok(Step::new(
self.output.take().into_iter().collect(),
fault_log,
self.messages.drain(..).collect(),
))
}

View File

@ -28,7 +28,7 @@ use std::sync::Arc;
use crypto::error as cerror;
use crypto::{Signature, SignatureShare};
use fault_log::{FaultKind, FaultLog};
use messaging::{DistAlgorithm, NetworkInfo, Step, Target, TargetedMessage};
use messaging::{DistAlgorithm, NetworkInfo, Step, Target};
error_chain! {
links {
@ -78,7 +78,7 @@ pub struct CommonCoin<NodeUid, T> {
terminated: bool,
}
pub type CommonCoinStep<NodeUid> = Step<NodeUid, bool>;
pub type CommonCoinStep<N> = Step<N, bool, CommonCoinMessage>;
impl<NodeUid, T> DistAlgorithm for CommonCoin<NodeUid, T>
where
@ -117,13 +117,6 @@ where
self.step(fault_log)
}
/// Takes the next share of a threshold signature message for multicasting to all other nodes.
fn next_message(&mut self) -> Option<TargetedMessage<Self::Message, Self::NodeUid>> {
self.messages
.pop_front()
.map(|msg| Target::All.message(msg))
}
/// Whether the algorithm has terminated.
fn terminated(&self) -> bool {
self.terminated
@ -155,6 +148,10 @@ where
Ok(Step::new(
self.output.take().into_iter().collect(),
fault_log,
self.messages
.drain(..)
.map(|msg| Target::All.message(msg))
.collect(),
))
}

View File

@ -70,19 +70,27 @@ struct MessageQueue<NodeUid: Rand>(VecDeque<TargetedMessage<Message<NodeUid>, No
impl<NodeUid: Clone + Debug + Ord + Rand> MessageQueue<NodeUid> {
/// Appends to the queue the messages from `agr`, wrapped with `proposer_id`.
fn extend_agreement(&mut self, proposer_id: &NodeUid, agr: &mut Agreement<NodeUid>) {
fn extend_agreement(
&mut self,
proposer_id: &NodeUid,
msgs: &mut VecDeque<TargetedMessage<AgreementMessage, NodeUid>>,
) {
let convert = |msg: TargetedMessage<AgreementMessage, NodeUid>| {
msg.map(|a_msg| Message::Agreement(proposer_id.clone(), a_msg))
};
self.extend(agr.message_iter().map(convert));
self.extend(msgs.drain(..).map(convert));
}
/// Appends to the queue the messages from `bc`, wrapped with `proposer_id`.
fn extend_broadcast(&mut self, proposer_id: &NodeUid, bc: &mut Broadcast<NodeUid>) {
fn extend_broadcast(
&mut self,
proposer_id: &NodeUid,
msgs: &mut VecDeque<TargetedMessage<BroadcastMessage, NodeUid>>,
) {
let convert = |msg: TargetedMessage<BroadcastMessage, NodeUid>| {
msg.map(|b_msg| Message::Broadcast(proposer_id.clone(), b_msg))
};
self.extend(bc.message_iter().map(convert));
self.extend(msgs.drain(..).map(convert));
}
}
@ -102,7 +110,8 @@ pub struct CommonSubset<NodeUid: Rand> {
decided: bool,
}
pub type CommonSubsetStep<NodeUid> = Step<NodeUid, BTreeMap<NodeUid, ProposedValue>>;
pub type CommonSubsetStep<NodeUid> =
Step<NodeUid, BTreeMap<NodeUid, ProposedValue>, Message<NodeUid>>;
impl<NodeUid: Clone + Debug + Ord + Rand> DistAlgorithm for CommonSubset<NodeUid> {
type NodeUid = NodeUid;
@ -133,10 +142,6 @@ impl<NodeUid: Clone + Debug + Ord + Rand> DistAlgorithm for CommonSubset<NodeUid
self.step(fault_log)
}
fn next_message(&mut self) -> Option<TargetedMessage<Self::Message, Self::NodeUid>> {
self.messages.pop_front()
}
fn terminated(&self) -> bool {
self.messages.is_empty() && self.agreement_instances.values().all(Agreement::terminated)
}
@ -185,6 +190,7 @@ impl<NodeUid: Clone + Debug + Ord + Rand> CommonSubset<NodeUid> {
Ok(Step::new(
self.output.take().into_iter().collect(),
fault_log,
self.messages.drain(..).collect(),
))
}
@ -243,9 +249,10 @@ impl<NodeUid: Clone + Debug + Ord + Rand> CommonSubset<NodeUid> {
.broadcast_instances
.get_mut(proposer_id)
.ok_or(ErrorKind::NoSuchBroadcastInstance)?;
let step = f(broadcast)?;
let mut step = f(broadcast)?;
fault_log.extend(step.fault_log);
self.messages.extend_broadcast(&proposer_id, broadcast);
self.messages
.extend_broadcast(&proposer_id, &mut step.messages);
if let Some(output) = step.output.into_iter().next() {
output
} else {
@ -284,9 +291,10 @@ impl<NodeUid: Clone + Debug + Ord + Rand> CommonSubset<NodeUid> {
if agreement.terminated() {
return Ok(fault_log);
}
let step = f(agreement)?;
let mut step = f(agreement)?;
fault_log.extend(step.fault_log);
self.messages.extend_agreement(proposer_id, agreement);
self.messages
.extend_agreement(proposer_id, &mut step.messages);
if let Some(output) = step.output.into_iter().next() {
output
} else {
@ -311,9 +319,9 @@ impl<NodeUid: Clone + Debug + Ord + Rand> CommonSubset<NodeUid> {
// input 0 to each instance of BA that has not yet been provided input.
for (uid, agreement) in &mut self.agreement_instances {
if agreement.accepts_input() {
let step = agreement.input(false)?;
let mut step = agreement.input(false)?;
fault_log.extend(step.fault_log);
self.messages.extend_agreement(uid, agreement);
self.messages.extend_agreement(uid, &mut step.messages);
if let Some(output) = step.output.into_iter().next() {
if self.agreement_results.insert(uid.clone(), output).is_some() {
return Err(ErrorKind::MultipleAgreementResults.into());

View File

@ -116,7 +116,7 @@ pub struct DynamicHoneyBadger<C, NodeUid: Rand> {
output: VecDeque<Batch<C, NodeUid>>,
}
pub type DynamicHoneyBadgerStep<C, NodeUid> = Step<NodeUid, Batch<C, NodeUid>>;
pub type DynamicHoneyBadgerStep<C, NodeUid> = Step<NodeUid, Batch<C, NodeUid>, Message<NodeUid>>;
impl<C, NodeUid> DistAlgorithm for DynamicHoneyBadger<C, NodeUid>
where
@ -169,10 +169,6 @@ where
self.step(fault_log)
}
fn next_message(&mut self) -> Option<TargetedMessage<Self::Message, NodeUid>> {
self.messages.pop_front()
}
fn terminated(&self) -> bool {
false
}
@ -188,7 +184,11 @@ where
NodeUid: Eq + Ord + Clone + Debug + Serialize + for<'r> Deserialize<'r> + Hash + Rand,
{
fn step(&mut self, fault_log: FaultLog<NodeUid>) -> Result<DynamicHoneyBadgerStep<C, NodeUid>> {
Ok(Step::new(self.output.drain(..).collect(), fault_log))
Ok(Step::new(
self.output.drain(..).collect(),
fault_log,
self.messages.drain(..).collect(),
))
}
/// Returns a new `DynamicHoneyBadgerBuilder` configured to use the node IDs and cryptographic
@ -276,7 +276,7 @@ where
/// Processes all pending batches output by Honey Badger.
fn process_output(
&mut self,
step: HoneyBadgerStep<InternalContrib<C, NodeUid>, NodeUid>,
mut step: HoneyBadgerStep<InternalContrib<C, NodeUid>, NodeUid>,
) -> Result<FaultLog<NodeUid>> {
let mut fault_log = FaultLog::new();
fault_log.extend(step.fault_log);
@ -329,7 +329,7 @@ where
self.output.push_back(batch);
}
self.messages
.extend_with_epoch(self.start_epoch, &mut self.honey_badger);
.extend_with_epoch(self.start_epoch, &mut step.messages);
// If `start_epoch` changed, we can now handle some queued messages.
if start_epoch < self.start_epoch {
let queue = mem::replace(&mut self.incoming_queue, Vec::new());
@ -375,7 +375,7 @@ where
fn restart_honey_badger(&mut self, epoch: u64) {
// TODO: Filter out the messages for `epoch` and later.
self.messages
.extend_with_epoch(self.start_epoch, &mut self.honey_badger);
.extend_with_epoch(self.start_epoch, &mut self.honey_badger.messages.0);
self.start_epoch = epoch;
self.key_gen_msg_buffer.retain(|kg_msg| kg_msg.0 >= epoch);
let netinfo = Arc::new(self.netinfo.clone());
@ -529,14 +529,15 @@ where
NodeUid: Eq + Hash + Ord + Clone + Debug + Serialize + for<'r> Deserialize<'r> + Rand,
{
/// Appends to the queue the messages from `hb`, wrapped with `epoch`.
fn extend_with_epoch<Tx>(&mut self, epoch: u64, hb: &mut HoneyBadger<Tx, NodeUid>)
where
Tx: Eq + Serialize + for<'r> Deserialize<'r> + Debug + Hash,
{
fn extend_with_epoch(
&mut self,
epoch: u64,
msgs: &mut VecDeque<TargetedMessage<HbMessage<NodeUid>, NodeUid>>,
) {
let convert = |msg: TargetedMessage<HbMessage<NodeUid>, NodeUid>| {
msg.map(|hb_msg| Message::HoneyBadger(epoch, hb_msg))
};
self.extend(hb.message_iter().map(convert));
self.extend(msgs.drain(..).map(convert));
}
}

View File

@ -119,7 +119,7 @@ pub struct HoneyBadger<C, NodeUid: Rand> {
/// The maximum number of `CommonSubset` instances that we run simultaneously.
max_future_epochs: u64,
/// The messages that need to be sent to other nodes.
messages: MessageQueue<NodeUid>,
pub(crate) messages: MessageQueue<NodeUid>,
/// The outputs from completed epochs.
output: Vec<Batch<C, NodeUid>>,
/// Messages for future epochs that couldn't be handled yet.
@ -134,7 +134,7 @@ pub struct HoneyBadger<C, NodeUid: Rand> {
ciphertexts: BTreeMap<u64, BTreeMap<NodeUid, Ciphertext>>,
}
pub type HoneyBadgerStep<C, NodeUid> = Step<NodeUid, Batch<C, NodeUid>>;
pub type HoneyBadgerStep<C, NodeUid> = Step<NodeUid, Batch<C, NodeUid>, Message<NodeUid>>;
impl<C, NodeUid> DistAlgorithm for HoneyBadger<C, NodeUid>
where
@ -174,10 +174,6 @@ where
self.step(fault_log)
}
fn next_message(&mut self) -> Option<TargetedMessage<Self::Message, NodeUid>> {
self.messages.pop_front()
}
fn terminated(&self) -> bool {
false
}
@ -202,7 +198,11 @@ where
&mut self,
fault_log: FaultLog<NodeUid>,
) -> HoneyBadgerResult<HoneyBadgerStep<C, NodeUid>> {
Ok(Step::new(self.output.drain(..).collect(), fault_log))
Ok(Step::new(
self.output.drain(..).collect(),
fault_log,
self.messages.drain(..).collect(),
))
}
/// Proposes a new item in the current epoch.
@ -220,11 +220,9 @@ where
let ser_prop = bincode::serialize(&proposal)?;
let ciphertext = self.netinfo.public_key_set().public_key().encrypt(ser_prop);
self.has_input = true;
let step = cs.input(bincode::serialize(&ciphertext).unwrap())?;
self.messages.extend_with_epoch(self.epoch, cs);
step
cs.input(bincode::serialize(&ciphertext).unwrap())?
};
Ok(self.process_output(step)?)
Ok(self.process_output(step, None)?)
}
/// Returns `true` if input for the current epoch has already been provided.
@ -270,15 +268,9 @@ where
}
}
};
// Handle the message and put the outgoing messages into the queue.
let cs_step = cs.handle_message(sender_id, message)?;
self.messages.extend_with_epoch(epoch, cs);
cs_step
cs.handle_message(sender_id, message)?
};
// If this is the current epoch, the message could cause a new output.
if epoch == self.epoch {
fault_log.extend(self.process_output(step)?);
}
fault_log.extend(self.process_output(step, Some(epoch))?);
self.remove_terminated(epoch);
Ok(fault_log)
}
@ -576,18 +568,27 @@ where
}
}
/// Checks whether the current epoch has output, and if it does, sends out our decryption shares.
/// Checks whether the current epoch has output, and if it does, sends out our decryption
/// shares. The `epoch` argument allows to differentiate between calls which produce output in
/// all conditions, `epoch == None`, and calls which only produce output in a given epoch,
/// `epoch == Some(given_epoch)`.
fn process_output(
&mut self,
step: CommonSubsetStep<NodeUid>,
epoch: Option<u64>,
) -> HoneyBadgerResult<FaultLog<NodeUid>> {
let Step {
output,
mut fault_log,
mut messages,
} = step;
for cs_output in output {
fault_log.extend(self.send_decryption_shares(cs_output)?);
// TODO: May also check that there is no further output from Common Subset.
self.messages.extend_with_epoch(self.epoch, &mut messages);
// If this is the current epoch, the message could cause a new output.
if epoch.is_none() || epoch == Some(self.epoch) {
for cs_output in output {
fault_log.extend(self.send_decryption_shares(cs_output)?);
// TODO: May also check that there is no further output from Common Subset.
}
}
Ok(fault_log)
}
@ -695,14 +696,20 @@ impl<NodeUid: Rand> Message<NodeUid> {
/// The queue of outgoing messages in a `HoneyBadger` instance.
#[derive(Deref, DerefMut)]
struct MessageQueue<NodeUid: Rand>(VecDeque<TargetedMessage<Message<NodeUid>, NodeUid>>);
pub(crate) struct MessageQueue<NodeUid: Rand>(
pub(crate) VecDeque<TargetedMessage<Message<NodeUid>, NodeUid>>,
);
impl<NodeUid: Clone + Debug + Ord + Rand> MessageQueue<NodeUid> {
/// Appends to the queue the messages from `cs`, wrapped with `epoch`.
fn extend_with_epoch(&mut self, epoch: u64, cs: &mut CommonSubset<NodeUid>) {
fn extend_with_epoch(
&mut self,
epoch: u64,
msgs: &mut VecDeque<TargetedMessage<common_subset::Message<NodeUid>, NodeUid>>,
) {
let convert = |msg: TargetedMessage<common_subset::Message<NodeUid>, NodeUid>| {
msg.map(|cs_msg| MessageContent::CommonSubset(cs_msg).with_epoch(epoch))
};
self.extend(cs.message_iter().map(convert));
self.extend(msgs.drain(..).map(convert));
}
}

View File

@ -52,35 +52,47 @@ impl<M, N> TargetedMessage<M, N> {
/// Result of one step of the local state machine of a distributed algorithm. Such a result should
/// be used and never discarded by the client of the algorithm.
#[must_use = "The algorithm step result must be used."]
pub struct Step<N, O>
pub struct Step<N, O, M>
where
N: Clone,
{
pub output: VecDeque<O>,
pub fault_log: FaultLog<N>,
pub messages: VecDeque<TargetedMessage<M, N>>,
}
impl<N, O> Default for Step<N, O>
impl<N, O, M> Default for Step<N, O, M>
where
N: Clone,
{
fn default() -> Step<N, O> {
fn default() -> Step<N, O, M> {
Step {
output: Default::default(),
output: VecDeque::default(),
fault_log: FaultLog::default(),
messages: VecDeque::default(),
}
}
}
impl<N, O> Step<N, O>
impl<N, O, M> Step<N, O, M>
where
N: Clone,
{
pub fn new(output: VecDeque<O>, fault_log: FaultLog<N>) -> Self {
Step { output, fault_log }
pub fn new(
output: VecDeque<O>,
fault_log: FaultLog<N>,
messages: VecDeque<TargetedMessage<M, N>>,
) -> Self {
Step {
output,
fault_log,
messages,
}
}
}
type StepResult<N, O, M, E> = Result<Step<N, O, M>, E>;
/// A distributed algorithm that defines a message flow.
pub trait DistAlgorithm {
/// Unique node identifier.
@ -99,44 +111,20 @@ pub trait DistAlgorithm {
fn input(
&mut self,
input: Self::Input,
) -> Result<Step<Self::NodeUid, Self::Output>, Self::Error>;
) -> StepResult<Self::NodeUid, Self::Output, Self::Message, Self::Error>;
/// Handles a message received from node `sender_id`.
fn handle_message(
&mut self,
sender_id: &Self::NodeUid,
message: Self::Message,
) -> Result<Step<Self::NodeUid, Self::Output>, Self::Error>;
/// Returns a message that needs to be sent to another node.
fn next_message(&mut self) -> Option<TargetedMessage<Self::Message, Self::NodeUid>>;
) -> StepResult<Self::NodeUid, Self::Output, Self::Message, Self::Error>;
/// Returns `true` if execution has completed and this instance can be dropped.
fn terminated(&self) -> bool;
/// Returns this node's own ID.
fn our_id(&self) -> &Self::NodeUid;
/// Returns an iterator over the outgoing messages.
fn message_iter(&mut self) -> MessageIter<Self>
where
Self: Sized,
{
MessageIter { algorithm: self }
}
}
/// An iterator over a distributed algorithm's outgoing messages.
pub struct MessageIter<'a, D: DistAlgorithm + 'a> {
algorithm: &'a mut D,
}
impl<'a, D: DistAlgorithm + 'a> Iterator for MessageIter<'a, D> {
type Item = TargetedMessage<D::Message, D::NodeUid>;
fn next(&mut self) -> Option<Self::Item> {
self.algorithm.next_message()
}
}
/// Common data shared between algorithms: the nodes' IDs and key shares.

View File

@ -119,7 +119,7 @@ where
output: VecDeque<Batch<Tx, NodeUid>>,
}
pub type QueueingHoneyBadgerStep<Tx, NodeUid> = Step<NodeUid, Batch<Tx, NodeUid>>;
pub type QueueingHoneyBadgerStep<Tx, NodeUid> = Step<NodeUid, Batch<Tx, NodeUid>, Message<NodeUid>>;
impl<Tx, NodeUid> DistAlgorithm for QueueingHoneyBadger<Tx, NodeUid>
where
@ -135,18 +135,18 @@ where
fn input(&mut self, input: Self::Input) -> Result<QueueingHoneyBadgerStep<Tx, NodeUid>> {
// User transactions are forwarded to `HoneyBadger` right away. Internal messages are
// in addition signed and broadcast.
let fault_log = match input {
let (fault_log, messages) = match input {
Input::User(tx) => {
self.queue.0.push_back(tx);
FaultLog::new()
(FaultLog::new(), VecDeque::new())
}
Input::Change(change) => {
let step = self.dyn_hb.input(Input::Change(change))?;
// FIXME: Use the output since `dyn_hb` can output immediately on input.
step.fault_log
(step.fault_log, step.messages)
}
};
self.step(fault_log)
self.step(fault_log, messages)
}
fn handle_message(
@ -157,17 +157,16 @@ where
let Step {
output,
mut fault_log,
mut messages,
} = self.dyn_hb.handle_message(sender_id, message)?;
for batch in output {
self.queue.remove_all(batch.iter());
self.output.push_back(batch);
}
fault_log.extend(self.propose()?);
self.step(fault_log)
}
fn next_message(&mut self) -> Option<TargetedMessage<Self::Message, NodeUid>> {
self.dyn_hb.next_message()
let (propose_fault_log, propose_messages) = self.propose()?;
fault_log.extend(propose_fault_log);
messages.extend(propose_messages);
self.step(fault_log, messages)
}
fn terminated(&self) -> bool {
@ -195,8 +194,14 @@ where
fn step(
&mut self,
fault_log: FaultLog<NodeUid>,
messages: VecDeque<TargetedMessage<Message<NodeUid>, NodeUid>>,
) -> Result<QueueingHoneyBadgerStep<Tx, NodeUid>> {
Ok(Step::new(self.output.drain(..).collect(), fault_log))
Ok(Step::new(
self.output.drain(..).collect(),
fault_log,
messages,
//self.dyn_hb.messages.drain(..).collect(),
))
}
/// Returns a reference to the internal `DynamicHoneyBadger` instance.
@ -205,20 +210,27 @@ where
}
/// Initiates the next epoch by proposing a batch from the queue.
fn propose(&mut self) -> Result<FaultLog<NodeUid>> {
fn propose(
&mut self,
) -> Result<(
FaultLog<NodeUid>,
VecDeque<TargetedMessage<Message<NodeUid>, NodeUid>>,
)> {
let amount = cmp::max(1, self.batch_size / self.dyn_hb.netinfo().num_nodes());
// TODO: This will loop forever if we are the only validator.
let mut fault_log = FaultLog::new();
let mut messages = VecDeque::new();
while !self.dyn_hb.has_input() {
let proposal = self.queue.choose(amount, self.batch_size);
let step = self.dyn_hb.input(Input::User(proposal))?;
fault_log.extend(step.fault_log);
messages.extend(step.messages);
for batch in step.output {
self.queue.remove_all(batch.iter());
self.output.push_back(batch);
}
}
Ok(fault_log)
Ok((fault_log, messages))
}
}

View File

@ -80,8 +80,9 @@ impl Adversary<Broadcast<NodeUid>> for ProposeAdversary {
let netinfo = Arc::new(NetworkInfo::generate_map(node_ids).remove(&id).unwrap());
let mut bc = Broadcast::new(netinfo, id).expect("broadcast instance");
// FIXME: Use the output.
let _ = bc.input(b"Fake news".to_vec()).expect("propose");
bc.message_iter()
let step = bc.input(b"Fake news".to_vec()).expect("propose");
step.messages
.into_iter()
.map(|msg| MessageWithSender::new(id, msg))
.collect()
}

View File

@ -22,6 +22,8 @@ pub struct TestNode<D: DistAlgorithm> {
pub queue: VecDeque<(D::NodeUid, D::Message)>,
/// The values this node has output so far.
outputs: Vec<D::Output>,
/// Outgoing messages to be sent to other nodes.
messages: VecDeque<TargetedMessage<D::Message, D::NodeUid>>,
}
impl<D: DistAlgorithm> TestNode<D> {
@ -40,6 +42,7 @@ impl<D: DistAlgorithm> TestNode<D> {
pub fn input(&mut self, input: D::Input) {
let step = self.algo.input(input).expect("input");
self.outputs.extend(step.output);
self.messages.extend(step.messages);
}
/// Returns the internal algorithm's instance.
@ -55,6 +58,7 @@ impl<D: DistAlgorithm> TestNode<D> {
algo,
queue: VecDeque::new(),
outputs: Vec::new(),
messages: VecDeque::new(),
}
}
@ -67,6 +71,7 @@ impl<D: DistAlgorithm> TestNode<D> {
.handle_message(&from_id, msg)
.expect("handling message");
self.outputs.extend(step.output);
self.messages.extend(step.messages);
}
/// Checks whether the node has messages to process
@ -412,7 +417,7 @@ where
}
let mut initial_msgs: Vec<(D::NodeUid, Vec<_>)> = Vec::new();
for (id, node) in &mut network.nodes {
initial_msgs.push((*id, node.algo.message_iter().collect()));
initial_msgs.push((*id, node.messages.drain(..).collect()));
}
for (id, msgs) in initial_msgs {
network.dispatch_messages(id, msgs);
@ -476,7 +481,7 @@ where
// The node handles the incoming message and creates new outgoing ones to be dispatched.
let msgs: Vec<_> = {
let node = self.nodes.get_mut(&id).unwrap();
let mut node = self.nodes.get_mut(&id).unwrap();
// Ensure the adversary is playing fair by selecting a node that will result in actual
// progress being made, otherwise `TestNode::handle_message()` will panic on `expect()`
@ -487,7 +492,7 @@ where
);
node.handle_message();
node.algo.message_iter().collect()
node.messages.drain(..).collect()
};
self.dispatch_messages(id, msgs);
@ -497,9 +502,9 @@ where
/// Inputs a value in node `id`.
pub fn input(&mut self, id: NodeUid, value: D::Input) {
let msgs: Vec<_> = {
let node = self.nodes.get_mut(&id).expect("input instance");
let mut node = self.nodes.get_mut(&id).expect("input instance");
node.input(value);
node.algo.message_iter().collect()
node.messages.drain(..).collect()
};
self.dispatch_messages(id, msgs);
}