Start implementing the top-level Honey Badger algorithm.

This also contains a few fixes for the `common_subset` module:
* Rename `common_subset::Output` to `Message` to avoid confusing it
  with the value that the algorithm outputs as a result.
* Implement dispatch of messages to the right instance within
  `CommonSubset`, in a way that is transparent to the user.
This commit is contained in:
Andreas Fackler 2018-05-12 16:09:07 +02:00
parent a83536dc6f
commit 38cdd596a2
8 changed files with 309 additions and 98 deletions

View File

@ -4,13 +4,15 @@ version = "0.1.0"
authors = ["Vladimir Komendantskiy <komendantsky@gmail.com>"]
[dependencies]
bincode = "1.0.0"
env_logger = "0.5.10"
itertools = "0.7"
log = "0.4.1"
reed-solomon-erasure = "3.0"
merkle = { git = "https://github.com/vkomenda/merkle.rs", branch = "public-proof" }
protobuf = "1.4.4"
reed-solomon-erasure = "3.0"
ring = "^0.12"
serde = "1.0.54"
[build-dependencies]
protoc-rust = "1.4.4"

View File

@ -41,7 +41,7 @@ use std::marker::{Send, Sync};
use std::net::SocketAddr;
use std::{io, iter, process, thread, time};
use hbbft::broadcast::{Broadcast, BroadcastMessage, TargetedBroadcastMessage};
use hbbft::broadcast::{Broadcast, BroadcastMessage};
use hbbft::messaging::SourcedMessage;
use hbbft::proto::message::BroadcastProto;
use network::commst;
@ -130,8 +130,6 @@ impl<T: Clone + Debug + AsRef<[u8]> + PartialEq + Send + Sync + From<Vec<u8>> +
for msg in broadcast
.propose_value(v.clone().into())
.expect("propose value")
.into_iter()
.map(TargetedBroadcastMessage::into)
{
tx_from_algo.send(msg).expect("send from algo");
}
@ -148,7 +146,7 @@ impl<T: Clone + Debug + AsRef<[u8]> + PartialEq + Send + Sync + From<Vec<u8>> +
for msg in &msgs {
debug!("{} sending to {:?}: {:?}", our_id, msg.target, msg.message);
}
for msg in msgs.into_iter().map(TargetedBroadcastMessage::into) {
for msg in msgs {
tx_from_algo.send(msg).expect("send from algo");
}
if let Some(output) = opt_output {

View File

@ -11,7 +11,7 @@ use std::sync::{RwLock, RwLockWriteGuard};
use messaging::{Target, TargetedMessage};
type MessageQueue<NodeUid> = VecDeque<TargetedBroadcastMessage<NodeUid>>;
type MessageQueue<NodeUid> = VecDeque<TargetedMessage<BroadcastMessage, NodeUid>>;
/// The three kinds of message sent during the reliable broadcast stage of the
/// consensus algorithm.
@ -22,22 +22,6 @@ pub enum BroadcastMessage {
Ready(Vec<u8>),
}
impl BroadcastMessage {
fn target_all<NodeUid>(self) -> TargetedBroadcastMessage<NodeUid> {
TargetedBroadcastMessage {
target: Target::All,
message: self,
}
}
fn target_node<NodeUid>(self, id: NodeUid) -> TargetedBroadcastMessage<NodeUid> {
TargetedBroadcastMessage {
target: Target::Node(id),
message: self,
}
}
}
impl fmt::Debug for BroadcastMessage {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
@ -48,22 +32,6 @@ impl fmt::Debug for BroadcastMessage {
}
}
/// A `BroadcastMessage` to be sent out, together with a target.
#[derive(Clone, Debug)]
pub struct TargetedBroadcastMessage<NodeUid> {
pub target: Target<NodeUid>,
pub message: BroadcastMessage,
}
impl From<TargetedBroadcastMessage<usize>> for TargetedMessage<BroadcastMessage, usize> {
fn from(msg: TargetedBroadcastMessage<usize>) -> TargetedMessage<BroadcastMessage, usize> {
TargetedMessage {
target: msg.target,
message: msg.message,
}
}
}
struct BroadcastState<NodeUid> {
/// Whether we have already multicas `Echo`.
echo_sent: bool,
@ -276,7 +244,8 @@ impl<NodeUid: Eq + Hash + Debug + Clone + Ord> Broadcast<NodeUid> {
result = Ok(proof);
} else {
// Rest of the proofs are sent to remote nodes.
outgoing.push_back(BroadcastMessage::Value(proof).target_node(uid.clone()));
let msg = BroadcastMessage::Value(proof);
outgoing.push_back(Target::Node(uid.clone()).message(msg));
}
}
@ -327,7 +296,7 @@ impl<NodeUid: Eq + Hash + Debug + Clone + Ord> Broadcast<NodeUid> {
// Otherwise multicast the proof in an `Echo` message, and handle it ourselves.
state.echo_sent = true;
let (output, echo_msgs) = self.handle_echo(&self.our_id, p.clone(), state)?;
let msgs = iter::once(BroadcastMessage::Echo(p).target_all())
let msgs = iter::once(Target::All.message(BroadcastMessage::Echo(p)))
.chain(echo_msgs)
.collect();
@ -364,7 +333,7 @@ impl<NodeUid: Eq + Hash + Debug + Clone + Ord> Broadcast<NodeUid> {
// Upon receiving `N - f` `Echo`s with this root hash, multicast `Ready`.
state.ready_sent = true;
let msg = BroadcastMessage::Ready(hash.clone()).target_all();
let msg = Target::All.message(BroadcastMessage::Ready(hash.clone()));
let (output, ready_msgs) = self.handle_ready(&self.our_id, &hash, state)?;
Ok((output, iter::once(msg).chain(ready_msgs).collect()))
}
@ -393,7 +362,7 @@ impl<NodeUid: Eq + Hash + Debug + Clone + Ord> Broadcast<NodeUid> {
{
// Enqueue a broadcast of a Ready message.
state.ready_sent = true;
iter::once(BroadcastMessage::Ready(hash.to_vec()).target_all()).collect()
iter::once(Target::All.message(BroadcastMessage::Ready(hash.to_vec()))).collect()
} else {
VecDeque::new()
};

View File

@ -11,23 +11,50 @@ use agreement;
use agreement::{Agreement, AgreementMessage};
use broadcast;
use broadcast::{Broadcast, BroadcastMessage, TargetedBroadcastMessage};
use broadcast::{Broadcast, BroadcastMessage};
use messaging::{Target, TargetedMessage};
// TODO: Make this a generic argument of `Broadcast`.
type ProposedValue = Vec<u8>;
// Type of output from the Common Subset message handler.
type CommonSubsetOutput<NodeUid> = (Option<HashSet<ProposedValue>>, VecDeque<Output<NodeUid>>);
type CommonSubsetOutput<NodeUid> = (
Option<HashSet<ProposedValue>>,
VecDeque<TargetedMessage<Message<NodeUid>, NodeUid>>,
);
/// Output from Common Subset to remote nodes.
pub enum Output<NodeUid> {
/// A broadcast message to be sent to the destination set in the
/// `TargetedBroadcastMessage`.
Broadcast(TargetedBroadcastMessage<NodeUid>),
/// An agreement message to be broadcast to all nodes. There are no
/// one-to-one agreement messages.
Agreement(AgreementMessage),
/// Message from Common Subset to remote nodes.
pub enum Message<NodeUid> {
/// A message for the broadcast algorithm concerning the set element proposed by the given node.
Broadcast(NodeUid, BroadcastMessage),
/// A message for the agreement algorithm concerning the set element proposed by the given
/// node.
Agreement(NodeUid, AgreementMessage),
}
/// Asynchronous Common Subset algorithm instance
///
/// The Asynchronous Common Subset protocol assumes a network of `N` nodes that send signed
/// messages to each other, with at most `f` of them malicious, where `3 * f < N`. Handling the
/// networking and signing is the responsibility of the user: only when a message has been
/// verified to be "from node i", it can be handed to the `CommonSubset` instance.
///
/// Each participating node proposes an element for inclusion. Under the above conditions, the
/// protocol guarantees that all of the good nodes output the same set, consisting of at least
/// `N - f` of the proposed elements.
///
/// The algorithm works as follows:
///
/// * `CommonSubset` instantiates one `Broadcast` algorithm for each of the participating nodes.
/// At least `N - f` of these - the ones whose proposer is not malicious - will eventually output
/// the element proposed by that node.
/// * It also instantiates an `Agreement` instance for each participating node, to decide whether
/// that node's proposed element should be included in the common set. Whenever an element is
/// received via broadcast, we input "yes" (`true`) into the corresponding `Agreement` instance.
/// * When `N - f` `Agreement` instances have decided "yes", we input "no" (`false`) into the
/// remaining ones, where we haven't provided input yet.
/// * Once all `Agreement` instances have decided, `CommonSubset` returns the set of all proposed
/// values for which the decision was "yes".
pub struct CommonSubset<NodeUid: Eq + Hash + Ord> {
uid: NodeUid,
num_nodes: usize,
@ -78,13 +105,13 @@ impl<NodeUid: Clone + Debug + Display + Eq + Hash + Ord> CommonSubset<NodeUid> {
pub fn send_proposed_value(
&self,
value: ProposedValue,
) -> Result<VecDeque<Output<NodeUid>>, Error> {
) -> Result<VecDeque<TargetedMessage<Message<NodeUid>, NodeUid>>, Error> {
// Upon receiving input v_i , input v_i to RBC_i. See Figure 2.
if let Some(instance) = self.broadcast_instances.get(&self.uid) {
Ok(instance
.propose_value(value)?
.into_iter()
.map(Output::Broadcast)
.map(|msg| msg.map(|b_msg| Message::Broadcast(self.uid.clone(), b_msg)))
.collect())
} else {
Err(Error::NoSuchBroadcastInstance)
@ -105,25 +132,47 @@ impl<NodeUid: Clone + Debug + Display + Eq + Hash + Ord> CommonSubset<NodeUid> {
}
}
/// Receives a message form a remote node `sender_id`, and returns an optional result of the
/// Common Subset algorithm - a set of proposed values - and a queue of messages to be sent to
/// remote nodes, or an error.
pub fn handle_message(
&mut self,
sender_id: &NodeUid,
message: Message<NodeUid>,
) -> Result<CommonSubsetOutput<NodeUid>, Error> {
match message {
Message::Broadcast(p_id, b_msg) => self.handle_broadcast(sender_id, &p_id, b_msg),
Message::Agreement(p_id, a_msg) => self.handle_agreement(sender_id, &p_id, &a_msg),
}
}
/// Receives a broadcast message from a remote node `sender_id` concerning a
/// value proposed by the node `proposer_id`. The output contains an
/// optional result of the Common Subset algorithm - a set of proposed
/// values - and a queue of messages to be sent to remote nodes, or an
/// error.
pub fn handle_broadcast(
fn handle_broadcast(
&mut self,
sender_id: &NodeUid,
proposer_id: &NodeUid,
bmessage: BroadcastMessage,
) -> Result<CommonSubsetOutput<NodeUid>, Error> {
let mut instance_result = None;
let input_result: Result<VecDeque<Output<NodeUid>>, Error> = {
let input_result: Result<
VecDeque<TargetedMessage<Message<NodeUid>, NodeUid>>,
Error,
> = {
if let Some(broadcast_instance) = self.broadcast_instances.get(proposer_id) {
broadcast_instance
.handle_broadcast_message(sender_id, bmessage)
.map(|(opt_value, queue)| {
instance_result = opt_value;
queue.into_iter().map(Output::Broadcast).collect()
queue
.into_iter()
.map(|msg| {
msg.map(|b_msg| Message::Broadcast(proposer_id.clone(), b_msg))
})
.collect()
})
.map_err(Error::from)
} else {
@ -138,7 +187,9 @@ impl<NodeUid: Clone + Debug + Display + Eq + Hash + Ord> CommonSubset<NodeUid> {
input_result.map(|mut queue| {
if let Some(agreement_message) = opt_message {
// Append the message to agreement nodes to the common output queue.
queue.push_back(Output::Agreement(agreement_message))
queue.push_back(
Target::All.message(Message::Agreement(proposer_id.clone(), agreement_message)),
);
}
(None, queue)
})
@ -149,7 +200,7 @@ impl<NodeUid: Clone + Debug + Display + Eq + Hash + Ord> CommonSubset<NodeUid> {
/// optional result of the Common Subset algorithm - a set of proposed
/// values - and a queue of messages to be sent to remote nodes, or an
/// error.
pub fn handle_agreement(
fn handle_agreement(
&mut self,
sender_id: &NodeUid,
proposer_id: &NodeUid,
@ -173,22 +224,19 @@ impl<NodeUid: Clone + Debug + Display + Eq + Hash + Ord> CommonSubset<NodeUid> {
}
}
if let Ok((output, mut outgoing)) = result {
// Process Agreement outputs.
if let Some(b) = output {
outgoing.append(&mut self.on_agreement_result(proposer_id, b)?);
}
let (output, mut outgoing) = result?;
// Check whether Agreement has completed.
Ok((
self.try_agreement_completion(),
outgoing.into_iter().map(Output::Agreement).collect(),
))
} else {
// error
result
.map(|(_, messages)| (None, messages.into_iter().map(Output::Agreement).collect()))
// Process Agreement outputs.
if let Some(b) = output {
outgoing.append(&mut self.on_agreement_result(proposer_id, b)?);
}
// Check whether Agreement has completed.
let into_msg = |a_msg| Target::All.message(Message::Agreement(proposer_id.clone(), a_msg));
Ok((
self.try_agreement_completion(),
outgoing.into_iter().map(into_msg).collect(),
))
}
/// Callback to be invoked on receipt of a returned value of the Agreement
@ -198,26 +246,28 @@ impl<NodeUid: Clone + Debug + Display + Eq + Hash + Ord> CommonSubset<NodeUid> {
element_proposer_id: &NodeUid,
result: bool,
) -> Result<VecDeque<AgreementMessage>, Error> {
let mut outgoing = VecDeque::new();
self.agreement_results
.insert(element_proposer_id.clone(), result);
if !result || self.count_true() < self.num_nodes - self.num_faulty_nodes {
return Ok(VecDeque::new());
}
// Upon delivery of value 1 from at least N f instances of BA, provide
// input 0 to each instance of BA that has not yet been provided input.
if result {
self.agreement_results
.insert(element_proposer_id.clone(), result);
// The number of instances of BA that output 1.
let results1 = self.agreement_results.values().filter(|v| **v).count();
if results1 >= self.num_nodes - self.num_faulty_nodes {
for instance in self.agreement_instances.values_mut() {
if instance.accepts_input() {
outgoing.push_back(instance.set_input(false)?);
}
}
let mut outgoing = VecDeque::new();
for instance in self.agreement_instances.values_mut() {
if instance.accepts_input() {
outgoing.push_back(instance.set_input(false)?);
}
}
Ok(outgoing)
}
/// Returns the number of agreement instances that have decided "yes".
fn count_true(&self) -> usize {
self.agreement_results.values().filter(|v| **v).count()
}
fn try_agreement_completion(&self) -> Option<HashSet<ProposedValue>> {
// Once all instances of BA have completed, let C ⊂ [1..N] be
// the indexes of each BA that delivered 1. Wait for the output

168
src/honey_badger.rs Normal file
View File

@ -0,0 +1,168 @@
use std::collections::{HashSet, VecDeque};
use std::fmt::{Debug, Display};
use std::hash::Hash;
use bincode;
use serde::de::DeserializeOwned;
use serde::Serialize;
use common_subset::{self, CommonSubset};
use messaging::TargetedMessage;
/// An instance of the Honey Badger Byzantine fault tolerant consensus algorithm.
pub struct HoneyBadger<T, N: Eq + Hash + Ord + Clone + Display> {
/// The buffer of transactions that have not yet been included in any batch.
buffer: VecDeque<T>,
/// The current epoch, i.e. the number of batches that have been output so far.
epoch: u64,
/// The Asynchronous Common Subset instance that decides which nodes' transactions to include.
// TODO: Common subset could be optimized to output before it is allowed to terminate. In that
// case, we would need to keep track of one or two previous instances, too.
common_subset: CommonSubset<N>,
/// This node's ID.
id: N,
/// The set of all node IDs of the participants (including ourselves).
all_ids: HashSet<N>,
/// The target number of transactions to be included in each batch.
// TODO: Do experiments and recommend a batch size. It should be proportional to
// `num_nodes * num_nodes * log(num_nodes)`.
batch_size: usize,
}
// TODO: Use a threshold encryption scheme to encrypt the proposed transactions.
// TODO: We only contribute a proposal to the next round once we have `batch_size` buffered
// transactions. This should be more configurable: `min_batch_size`, `max_batch_size` and maybe a
// timeout? The paper assumes that all nodes will often have more or less the same set of
// transactions in the buffer; if the sets are disjoint on average, we can just send our whole
// buffer instead of 1/n of it.
impl<T, N> HoneyBadger<T, N>
where
T: Ord + AsRef<[u8]> + Serialize + DeserializeOwned,
N: Eq + Hash + Ord + Clone + Display + Debug,
{
/// Returns a new Honey Badger instance with the given parameters, starting at epoch `0`.
pub fn new<I>(id: N, all_ids_iter: I, batch_size: usize) -> Result<Self, Error>
where
I: IntoIterator<Item = N>,
{
let all_ids: HashSet<N> = all_ids_iter.into_iter().collect();
if !all_ids.contains(&id) {
return Err(Error::OwnIdMissing);
}
Ok(HoneyBadger {
buffer: VecDeque::new(),
epoch: 0,
common_subset: CommonSubset::new(id.clone(), &all_ids)?,
id,
batch_size,
all_ids,
})
}
/// Adds transactions into the buffer.
pub fn add_transactions<I>(&mut self, txs: I) -> Result<HoneyBadgerOutput<T, N>, Error>
where
I: IntoIterator<Item = T>,
{
self.buffer.extend(txs);
if self.buffer.len() < self.batch_size {
return Ok((None, Vec::new()));
}
// TODO: Handle the case `all_ids.len() == 1`.
let share = bincode::serialize(&self.buffer)?;
let msgs = self.common_subset
.send_proposed_value(share)?
.into_iter()
.map(|targeted_msg| {
targeted_msg.map(|cs_msg| Message::CommonSubset(self.epoch, cs_msg))
})
.collect();
Ok((None, msgs))
}
/// Handles a message from another node, and returns the next batch, if any, and the messages
/// to be sent out.
pub fn handle_message(
&mut self,
sender_id: &N,
message: Message<N>,
) -> Result<HoneyBadgerOutput<T, N>, Error> {
match message {
Message::CommonSubset(epoch, cs_msg) => {
self.handle_common_subset_message(sender_id, epoch, cs_msg)
}
}
}
fn handle_common_subset_message(
&mut self,
sender_id: &N,
epoch: u64,
message: common_subset::Message<N>,
) -> Result<HoneyBadgerOutput<T, N>, Error> {
if epoch != self.epoch {
// TODO: Do we need to cache messages for future epochs?
return Ok((None, Vec::new()));
}
let (cs_out, cs_msgs) = self.common_subset.handle_message(sender_id, message)?;
let mut msgs: Vec<TargetedMessage<Message<N>, N>> = cs_msgs
.into_iter()
.map(|targeted_msg| targeted_msg.map(|cs_msg| Message::CommonSubset(epoch, cs_msg)))
.collect();
let output = if let Some(ser_batches) = cs_out {
let mut transactions: Vec<T> = ser_batches
.into_iter()
.map(|ser_batch| bincode::deserialize::<Vec<T>>(&ser_batch))
.collect::<Result<Vec<_>, Box<bincode::ErrorKind>>>()?
.into_iter()
.flat_map(|txs| txs)
.collect();
transactions.sort();
self.epoch += 1;
self.common_subset = CommonSubset::new(self.id.clone(), &self.all_ids)?;
let (_, new_epoch_msgs) = self.add_transactions(None)?;
msgs.extend(new_epoch_msgs);
Some(Batch {
epoch,
transactions,
})
} else {
None
};
Ok((output, msgs))
}
}
type HoneyBadgerOutput<T, N> = (Option<Batch<T>>, Vec<TargetedMessage<Message<N>, N>>);
/// A batch of transactions the algorithm has output.
pub struct Batch<T> {
pub epoch: u64,
pub transactions: Vec<T>,
}
/// A message sent to or received from another node's Honey Badger instance.
pub enum Message<N> {
/// A message belonging to the common subset algorithm in the given epoch.
CommonSubset(u64, common_subset::Message<N>),
// TODO: Decryption share.
}
/// A Honey Badger error.
pub enum Error {
OwnIdMissing,
CommonSubset(common_subset::Error),
Bincode(Box<bincode::ErrorKind>),
}
impl From<common_subset::Error> for Error {
fn from(err: common_subset::Error) -> Error {
Error::CommonSubset(err)
}
}
impl From<Box<bincode::ErrorKind>> for Error {
fn from(err: Box<bincode::ErrorKind>) -> Error {
Error::Bincode(err)
}
}

View File

@ -4,6 +4,8 @@
//! honey badger of BFT protocols" after a paper with the same title.
#![feature(optin_builtin_traits)]
extern crate bincode;
#[macro_use]
extern crate log;
extern crate itertools;
@ -11,10 +13,12 @@ extern crate merkle;
extern crate protobuf;
extern crate reed_solomon_erasure;
extern crate ring;
extern crate serde;
pub mod agreement;
pub mod broadcast;
pub mod common_subset;
pub mod honey_badger;
pub mod messaging;
pub mod proto;
pub mod proto_io;

View File

@ -17,9 +17,29 @@ pub enum Target<N> {
Node(N),
}
impl<N> Target<N> {
/// Returns a `TargetedMessage` with this target, and the given message.
pub fn message<M>(self, message: M) -> TargetedMessage<M, N> {
TargetedMessage {
target: self,
message,
}
}
}
/// Message with a designated target.
#[derive(Clone, Debug, PartialEq)]
pub struct TargetedMessage<M, N> {
pub target: Target<N>,
pub message: M,
}
impl<M, N> TargetedMessage<M, N> {
/// Applies the given transformation of messages, preserving the target.
pub fn map<T, F: Fn(M) -> T>(self, f: F) -> TargetedMessage<T, N> {
TargetedMessage {
target: self.target,
message: f(self.message),
}
}
}

View File

@ -13,15 +13,15 @@ use rand::Rng;
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::fmt;
use hbbft::broadcast::{Broadcast, BroadcastMessage, TargetedBroadcastMessage};
use hbbft::messaging::Target;
use hbbft::broadcast::{Broadcast, BroadcastMessage};
use hbbft::messaging::{Target, TargetedMessage};
#[derive(Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Clone, Copy)]
struct NodeId(usize);
type ProposedValue = Vec<u8>;
type MessageQueue = VecDeque<TargetedBroadcastMessage<NodeId>>;
type MessageQueue = VecDeque<TargetedMessage<BroadcastMessage, NodeId>>;
/// A "node" running a broadcast instance.
struct TestNode {
@ -97,10 +97,10 @@ trait Adversary {
fn pick_node(&self, nodes: &BTreeMap<NodeId, TestNode>) -> NodeId;
/// Adds a message sent to one of the adversary's nodes.
fn push_message(&mut self, sender_id: NodeId, msg: TargetedBroadcastMessage<NodeId>);
fn push_message(&mut self, sender_id: NodeId, msg: TargetedMessage<BroadcastMessage, NodeId>);
/// Produces a list of messages to be sent from the adversary's nodes.
fn step(&mut self) -> Vec<(NodeId, TargetedBroadcastMessage<NodeId>)>;
fn step(&mut self) -> Vec<(NodeId, TargetedMessage<BroadcastMessage, NodeId>)>;
}
/// An adversary whose nodes never send any messages.
@ -120,11 +120,11 @@ impl Adversary for SilentAdversary {
self.scheduler.pick_node(nodes)
}
fn push_message(&mut self, _: NodeId, _: TargetedBroadcastMessage<NodeId>) {
fn push_message(&mut self, _: NodeId, _: TargetedMessage<BroadcastMessage, NodeId>) {
// All messages are ignored.
}
fn step(&mut self) -> Vec<(NodeId, TargetedBroadcastMessage<NodeId>)> {
fn step(&mut self) -> Vec<(NodeId, TargetedMessage<BroadcastMessage, NodeId>)> {
vec![] // No messages are sent.
}
}
@ -158,11 +158,11 @@ impl Adversary for ProposeAdversary {
self.scheduler.pick_node(nodes)
}
fn push_message(&mut self, _: NodeId, _: TargetedBroadcastMessage<NodeId>) {
fn push_message(&mut self, _: NodeId, _: TargetedMessage<BroadcastMessage, NodeId>) {
// All messages are ignored.
}
fn step(&mut self) -> Vec<(NodeId, TargetedBroadcastMessage<NodeId>)> {
fn step(&mut self) -> Vec<(NodeId, TargetedMessage<BroadcastMessage, NodeId>)> {
if self.has_sent {
return vec![];
}
@ -212,11 +212,11 @@ impl<A: Adversary> TestNetwork<A> {
/// Pushes the messages into the queues of the corresponding recipients.
fn dispatch_messages<Q>(&mut self, sender_id: NodeId, msgs: Q)
where
Q: IntoIterator<Item = TargetedBroadcastMessage<NodeId>> + fmt::Debug,
Q: IntoIterator<Item = TargetedMessage<BroadcastMessage, NodeId>> + fmt::Debug,
{
for msg in msgs {
match msg {
TargetedBroadcastMessage {
TargetedMessage {
target: Target::All,
ref message,
} => {
@ -227,7 +227,7 @@ impl<A: Adversary> TestNetwork<A> {
}
self.adversary.push_message(sender_id, msg.clone());
}
TargetedBroadcastMessage {
TargetedMessage {
target: Target::Node(to_id),
ref message,
} => {