mirror of https://github.com/poanetwork/hbbft.git
test broadcast in a single thread
This commit is contained in:
parent
bb765cbb06
commit
f710806b17
320
src/broadcast.rs
320
src/broadcast.rs
|
@ -7,14 +7,40 @@ use proto::*;
|
|||
use reed_solomon_erasure as rse;
|
||||
use reed_solomon_erasure::ReedSolomon;
|
||||
use std::collections::{HashMap, HashSet, VecDeque};
|
||||
use std::fmt::Debug;
|
||||
use std::fmt::{Debug, Display};
|
||||
use std::hash::Hash;
|
||||
use std::marker::{Send, Sync};
|
||||
use std::sync::{Arc, Mutex, RwLock};
|
||||
|
||||
use messaging;
|
||||
use messaging::{AlgoMessage, Algorithm, Handler, LocalMessage, MessageLoopState, NodeUid,
|
||||
ProposedValue, QMessage, RemoteMessage, RemoteNode, SourcedMessage, Target,
|
||||
TargetedMessage};
|
||||
use messaging::{AlgoMessage, Algorithm, Handler, LocalMessage, MessageLoopState, ProposedValue,
|
||||
QMessage, RemoteMessage, RemoteNode, SourcedMessage, Target, TargetedMessage};
|
||||
|
||||
/// A `BroadcastMessage` to be sent out, together with a target.
|
||||
#[derive(Debug)]
|
||||
pub struct TargetedBroadcastMessage<NodeUid> {
|
||||
pub target: BroadcastTarget<NodeUid>,
|
||||
pub message: BroadcastMessage<ProposedValue>,
|
||||
}
|
||||
|
||||
impl TargetedBroadcastMessage<messaging::NodeUid> {
|
||||
pub fn into_remote_message(self) -> RemoteMessage {
|
||||
RemoteMessage {
|
||||
node: match self.target {
|
||||
BroadcastTarget::All => RemoteNode::All,
|
||||
BroadcastTarget::Node(node) => RemoteNode::Node(node),
|
||||
},
|
||||
message: Message::Broadcast(self.message),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A target node for a `BroadcastMessage`.
|
||||
#[derive(Debug)]
|
||||
pub enum BroadcastTarget<NodeUid> {
|
||||
All,
|
||||
Node(NodeUid),
|
||||
}
|
||||
|
||||
struct BroadcastState {
|
||||
root_hash: Option<Vec<u8>>,
|
||||
|
@ -27,7 +53,7 @@ struct BroadcastState {
|
|||
}
|
||||
|
||||
/// Reliable Broadcast algorithm instance.
|
||||
pub struct Broadcast {
|
||||
pub struct Broadcast<NodeUid: Eq + Hash> {
|
||||
/// The UID of this node.
|
||||
uid: NodeUid,
|
||||
/// UIDs of all nodes for iteration purposes.
|
||||
|
@ -42,7 +68,96 @@ pub struct Broadcast {
|
|||
state: RwLock<BroadcastState>,
|
||||
}
|
||||
|
||||
impl Broadcast {
|
||||
impl Broadcast<messaging::NodeUid> {
|
||||
/// The message-driven interface function for calls from the main message
|
||||
/// loop.
|
||||
pub fn on_message(
|
||||
&self,
|
||||
m: QMessage,
|
||||
tx: &Sender<QMessage>,
|
||||
) -> Result<MessageLoopState, Error> {
|
||||
match m {
|
||||
QMessage::Local(LocalMessage { message, .. }) => match message {
|
||||
AlgoMessage::BroadcastInput(value) => self.on_local_message(&mut value.to_owned()),
|
||||
|
||||
_ => Err(Error::UnexpectedMessage),
|
||||
},
|
||||
|
||||
QMessage::Remote(RemoteMessage {
|
||||
node: RemoteNode::Node(uid),
|
||||
message,
|
||||
}) => {
|
||||
if let Message::Broadcast(b) = message {
|
||||
self.on_remote_message(uid, &b, tx)
|
||||
} else {
|
||||
Err(Error::UnexpectedMessage)
|
||||
}
|
||||
}
|
||||
|
||||
_ => Err(Error::UnexpectedMessage),
|
||||
}
|
||||
}
|
||||
|
||||
fn on_remote_message(
|
||||
&self,
|
||||
uid: messaging::NodeUid,
|
||||
message: &BroadcastMessage<ProposedValue>,
|
||||
tx: &Sender<QMessage>,
|
||||
) -> Result<MessageLoopState, Error> {
|
||||
let (output, messages) = self.handle_broadcast_message(&uid, message)?;
|
||||
if let Some(value) = output {
|
||||
tx.send(QMessage::Local(LocalMessage {
|
||||
dst: Algorithm::CommonSubset,
|
||||
message: AlgoMessage::BroadcastOutput(self.uid, value),
|
||||
})).map_err(Error::from)?;
|
||||
}
|
||||
Ok(MessageLoopState::Processing(
|
||||
messages
|
||||
.into_iter()
|
||||
.map(TargetedBroadcastMessage::into_remote_message)
|
||||
.collect(),
|
||||
))
|
||||
}
|
||||
|
||||
/// Processes the proposed value input by broadcasting it.
|
||||
pub fn on_local_message(&self, value: &mut ProposedValue) -> Result<MessageLoopState, Error> {
|
||||
let mut state = self.state.write().unwrap();
|
||||
// Split the value into chunks/shards, encode them with erasure codes.
|
||||
// Assemble a Merkle tree from data and parity shards. Take all proofs
|
||||
// from this tree and send them, each to its own node.
|
||||
self.send_shards(value.clone())
|
||||
.map(|(proof, remote_messages)| {
|
||||
// Record the first proof as if it were sent by the node to
|
||||
// itself.
|
||||
let h = proof.root_hash.clone();
|
||||
if proof.validate(h.as_slice()) {
|
||||
// Save the leaf value for reconstructing the tree later.
|
||||
state.leaf_values[index_of_proof(&proof)] =
|
||||
Some(proof.value.clone().into_boxed_slice());
|
||||
state.leaf_values_num += 1;
|
||||
state.root_hash = Some(h);
|
||||
}
|
||||
|
||||
MessageLoopState::Processing(
|
||||
remote_messages
|
||||
.into_iter()
|
||||
.map(TargetedBroadcastMessage::into_remote_message)
|
||||
.collect(),
|
||||
)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, E> Handler<E> for Broadcast<messaging::NodeUid>
|
||||
where
|
||||
E: From<Error> + From<messaging::Error>,
|
||||
{
|
||||
fn handle(&self, m: QMessage, tx: Sender<QMessage>) -> Result<MessageLoopState, E> {
|
||||
self.on_message(m, &tx).map_err(E::from)
|
||||
}
|
||||
}
|
||||
|
||||
impl<NodeUid: Eq + Hash + Debug + Display + Clone> Broadcast<NodeUid> {
|
||||
pub fn new(uid: NodeUid, all_uids: HashSet<NodeUid>, num_nodes: usize) -> Result<Self, Error> {
|
||||
let num_faulty_nodes = (num_nodes - 1) / 3;
|
||||
let parity_shard_num = 2 * num_faulty_nodes;
|
||||
|
@ -68,38 +183,11 @@ impl Broadcast {
|
|||
})
|
||||
}
|
||||
|
||||
/// The message-driven interface function for calls from the main message
|
||||
/// loop.
|
||||
pub fn on_message<E>(&self, m: QMessage, tx: &Sender<QMessage>) -> Result<MessageLoopState, E>
|
||||
where
|
||||
E: From<Error> + From<messaging::Error>,
|
||||
{
|
||||
match m {
|
||||
QMessage::Local(LocalMessage { message, .. }) => match message {
|
||||
AlgoMessage::BroadcastInput(value) => self.on_local_message(value.to_owned()),
|
||||
|
||||
_ => Err(Error::UnexpectedMessage).map_err(E::from),
|
||||
},
|
||||
|
||||
QMessage::Remote(RemoteMessage {
|
||||
node: RemoteNode::Node(uid),
|
||||
message,
|
||||
}) => {
|
||||
if let Message::Broadcast(b) = message {
|
||||
self.on_remote_message(uid, &b, tx)
|
||||
} else {
|
||||
Err(Error::UnexpectedMessage).map_err(E::from)
|
||||
}
|
||||
}
|
||||
|
||||
_ => Err(Error::UnexpectedMessage).map_err(E::from),
|
||||
}
|
||||
}
|
||||
|
||||
/// Processes the proposed value input by broadcasting it.
|
||||
pub fn propose_value(&self, value: ProposedValue) ->
|
||||
Result<VecDeque<RemoteMessage>, Error>
|
||||
{
|
||||
pub fn propose_value(
|
||||
&self,
|
||||
value: ProposedValue,
|
||||
) -> Result<VecDeque<TargetedBroadcastMessage<NodeUid>>, Error> {
|
||||
let mut state = self.state.write().unwrap();
|
||||
// Split the value into chunks/shards, encode them with erasure codes.
|
||||
// Assemble a Merkle tree from data and parity shards. Take all proofs
|
||||
|
@ -107,45 +195,19 @@ impl Broadcast {
|
|||
self.send_shards(value)
|
||||
.map_err(Error::from)
|
||||
.map(|(proof, remote_messages)| {
|
||||
// Record the first proof as if it were sent by the node to
|
||||
// itself.
|
||||
let h = proof.root_hash.clone();
|
||||
if proof.validate(h.as_slice()) {
|
||||
// Save the leaf value for reconstructing the tree later.
|
||||
state.leaf_values[index_of_proof(&proof)] =
|
||||
Some(proof.value.clone().into_boxed_slice());
|
||||
state.leaf_values_num += 1;
|
||||
state.root_hash = Some(h);
|
||||
}
|
||||
// Record the first proof as if it were sent by the node to
|
||||
// itself.
|
||||
let h = proof.root_hash.clone();
|
||||
if proof.validate(h.as_slice()) {
|
||||
// Save the leaf value for reconstructing the tree later.
|
||||
state.leaf_values[index_of_proof(&proof)] =
|
||||
Some(proof.value.clone().into_boxed_slice());
|
||||
state.leaf_values_num += 1;
|
||||
state.root_hash = Some(h);
|
||||
}
|
||||
|
||||
remote_messages
|
||||
})
|
||||
}
|
||||
|
||||
/// FIXME: Deprecated. Processes the proposed value input by broadcasting
|
||||
/// it.
|
||||
pub fn on_local_message<E>(&self, value: ProposedValue) -> Result<MessageLoopState, E>
|
||||
where
|
||||
E: From<Error> + From<messaging::Error>,
|
||||
{
|
||||
let mut state = self.state.write().unwrap();
|
||||
// Split the value into chunks/shards, encode them with erasure codes.
|
||||
// Assemble a Merkle tree from data and parity shards. Take all proofs
|
||||
// from this tree and send them, each to its own node.
|
||||
self.send_shards(value).map(|(proof, remote_messages)| {
|
||||
// Record the first proof as if it were sent by the node to
|
||||
// itself.
|
||||
let h = proof.root_hash.clone();
|
||||
if proof.validate(h.as_slice()) {
|
||||
// Save the leaf value for reconstructing the tree later.
|
||||
state.leaf_values[index_of_proof(&proof)] =
|
||||
Some(proof.value.clone().into_boxed_slice());
|
||||
state.leaf_values_num += 1;
|
||||
state.root_hash = Some(h);
|
||||
}
|
||||
|
||||
MessageLoopState::Processing(remote_messages)
|
||||
}).map_err(E::from)
|
||||
remote_messages
|
||||
})
|
||||
}
|
||||
|
||||
/// Breaks the input value into shards of equal length and encodes them --
|
||||
|
@ -156,8 +218,13 @@ impl Broadcast {
|
|||
fn send_shards(
|
||||
&self,
|
||||
mut value: ProposedValue,
|
||||
) -> Result<(Proof<ProposedValue>, VecDeque<RemoteMessage>), Error>
|
||||
{
|
||||
) -> Result<
|
||||
(
|
||||
Proof<ProposedValue>,
|
||||
VecDeque<TargetedBroadcastMessage<NodeUid>>,
|
||||
),
|
||||
Error,
|
||||
> {
|
||||
let data_shard_num = self.coding.data_shard_count();
|
||||
let parity_shard_num = self.coding.parity_shard_count();
|
||||
|
||||
|
@ -218,9 +285,9 @@ impl Broadcast {
|
|||
result = Ok(proof);
|
||||
} else {
|
||||
// Rest of the proofs are sent to remote nodes.
|
||||
outgoing.push_back(RemoteMessage {
|
||||
node: RemoteNode::Node(uid),
|
||||
message: Message::Broadcast(BroadcastMessage::Value(proof)),
|
||||
outgoing.push_back(TargetedBroadcastMessage {
|
||||
target: BroadcastTarget::Node(uid),
|
||||
message: BroadcastMessage::Value(proof),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -229,41 +296,25 @@ impl Broadcast {
|
|||
result.map(|r| (r, outgoing))
|
||||
}
|
||||
|
||||
fn on_remote_message<E>(
|
||||
&self,
|
||||
uid: NodeUid,
|
||||
message: &BroadcastMessage<ProposedValue>,
|
||||
tx: &Sender<QMessage>,
|
||||
) -> Result<MessageLoopState, E>
|
||||
where
|
||||
E: From<Error> + From<messaging::Error>,
|
||||
{
|
||||
let (output, messages) = self.handle_broadcast_message::<E>(uid, message)?;
|
||||
if let Some(value) = output {
|
||||
tx.send(QMessage::Local(LocalMessage {
|
||||
dst: Algorithm::CommonSubset,
|
||||
message: AlgoMessage::BroadcastOutput(self.uid, value),
|
||||
})).map_err(Error::from)?;
|
||||
}
|
||||
Ok(MessageLoopState::Processing(messages))
|
||||
}
|
||||
|
||||
/// Handler of messages received from remote nodes.
|
||||
pub fn handle_broadcast_message<E>(
|
||||
pub fn handle_broadcast_message(
|
||||
&self,
|
||||
uid: NodeUid,
|
||||
uid: &NodeUid,
|
||||
message: &BroadcastMessage<ProposedValue>,
|
||||
) -> Result<(Option<ProposedValue>, VecDeque<RemoteMessage>), E>
|
||||
where
|
||||
E: From<Error> + From<messaging::Error>,
|
||||
{
|
||||
) -> Result<
|
||||
(
|
||||
Option<ProposedValue>,
|
||||
VecDeque<TargetedBroadcastMessage<NodeUid>>,
|
||||
),
|
||||
Error,
|
||||
> {
|
||||
let mut state = self.state.write().unwrap();
|
||||
let no_outgoing = VecDeque::new();
|
||||
|
||||
// A value received. Record the value and multicast an echo.
|
||||
match message {
|
||||
BroadcastMessage::Value(p) => {
|
||||
if uid != self.uid {
|
||||
if *uid != self.uid {
|
||||
// Ignore value messages from unrelated remote nodes.
|
||||
Ok((None, no_outgoing))
|
||||
} else {
|
||||
|
@ -288,9 +339,9 @@ impl Broadcast {
|
|||
}
|
||||
|
||||
// Enqueue a broadcast of an echo of this proof.
|
||||
let state = VecDeque::from(vec![RemoteMessage {
|
||||
node: RemoteNode::All,
|
||||
message: Message::Broadcast(BroadcastMessage::Echo(p.clone())),
|
||||
let state = VecDeque::from(vec![TargetedBroadcastMessage {
|
||||
target: BroadcastTarget::All,
|
||||
message: BroadcastMessage::Echo(p.clone()),
|
||||
}]);
|
||||
Ok((None, state))
|
||||
}
|
||||
|
@ -298,7 +349,7 @@ impl Broadcast {
|
|||
|
||||
// An echo received. Verify the proof it contains.
|
||||
BroadcastMessage::Echo(p) => {
|
||||
if state.root_hash.is_none() && uid == self.uid {
|
||||
if state.root_hash.is_none() && *uid == self.uid {
|
||||
state.root_hash = Some(p.root_hash.clone());
|
||||
debug!("Node {} Echo root hash {:?}", self.uid, state.root_hash);
|
||||
}
|
||||
|
@ -333,27 +384,21 @@ impl Broadcast {
|
|||
self.data_shard_num,
|
||||
h,
|
||||
);
|
||||
match result {
|
||||
Ok(_) => {
|
||||
// if Ready has not yet been sent, multicast
|
||||
// Ready
|
||||
if !state.ready_sent {
|
||||
state.ready_sent = true;
|
||||
result?;
|
||||
// if Ready has not yet been sent, multicast
|
||||
// Ready
|
||||
if !state.ready_sent {
|
||||
state.ready_sent = true;
|
||||
|
||||
Ok((
|
||||
None,
|
||||
VecDeque::from(vec![RemoteMessage {
|
||||
node: RemoteNode::All,
|
||||
message: Message::Broadcast(
|
||||
BroadcastMessage::Ready(h.to_owned()),
|
||||
),
|
||||
}]),
|
||||
))
|
||||
} else {
|
||||
Ok((None, no_outgoing))
|
||||
}
|
||||
}
|
||||
Err(e) => Err(E::from(e)),
|
||||
Ok((
|
||||
None,
|
||||
VecDeque::from(vec![TargetedBroadcastMessage {
|
||||
target: BroadcastTarget::All,
|
||||
message: BroadcastMessage::Ready(h.to_owned()),
|
||||
}]),
|
||||
))
|
||||
} else {
|
||||
Ok((None, no_outgoing))
|
||||
}
|
||||
} else {
|
||||
Ok((None, no_outgoing))
|
||||
|
@ -381,9 +426,9 @@ impl Broadcast {
|
|||
// has not yet been sent, multicast Ready(h).
|
||||
if (ready_num == self.num_faulty_nodes + 1) && !state.ready_sent {
|
||||
// Enqueue a broadcast of a ready message.
|
||||
outgoing.push_back(RemoteMessage {
|
||||
node: RemoteNode::All,
|
||||
message: Message::Broadcast(BroadcastMessage::Ready(h.to_vec())),
|
||||
outgoing.push_back(TargetedBroadcastMessage {
|
||||
target: BroadcastTarget::All,
|
||||
message: BroadcastMessage::Ready(h.to_vec()),
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -416,15 +461,6 @@ impl Broadcast {
|
|||
}
|
||||
}
|
||||
|
||||
impl<'a, E> Handler<E> for Broadcast
|
||||
where
|
||||
E: From<Error> + From<messaging::Error>,
|
||||
{
|
||||
fn handle(&self, m: QMessage, tx: Sender<QMessage>) -> Result<MessageLoopState, E> {
|
||||
self.on_message(m, &tx)
|
||||
}
|
||||
}
|
||||
|
||||
/// Broadcast algorithm instance.
|
||||
///
|
||||
/// The ACS algorithm requires multiple broadcast instances running
|
||||
|
|
|
@ -5,11 +5,11 @@ use std::collections::{HashMap, HashSet, VecDeque};
|
|||
use std::sync::RwLock;
|
||||
|
||||
use broadcast;
|
||||
use broadcast::Broadcast;
|
||||
use broadcast::{Broadcast, TargetedBroadcastMessage};
|
||||
|
||||
use messaging;
|
||||
use messaging::{AlgoMessage, Algorithm, Handler, LocalMessage, RemoteMessage,
|
||||
MessageLoopState, NodeUid, QMessage, ProposedValue};
|
||||
use messaging::{AlgoMessage, Algorithm, Handler, LocalMessage, MessageLoopState, NodeUid,
|
||||
ProposedValue, QMessage, RemoteMessage};
|
||||
|
||||
pub enum CommonSubsetMessage {
|
||||
|
||||
|
@ -25,7 +25,7 @@ pub struct CommonSubset {
|
|||
uid: NodeUid,
|
||||
num_nodes: usize,
|
||||
num_faulty_nodes: usize,
|
||||
broadcast_instances: HashMap<NodeUid, Broadcast>,
|
||||
broadcast_instances: HashMap<NodeUid, Broadcast<NodeUid>>,
|
||||
state: RwLock<CommonSubsetState>,
|
||||
}
|
||||
|
||||
|
@ -49,19 +49,20 @@ impl CommonSubset {
|
|||
|
||||
/// Common Subset input message handler. It receives a value for broadcast
|
||||
/// and redirects it to the corresponding broadcast instance.
|
||||
pub fn on_message_input(&self, value: ProposedValue) ->
|
||||
Result<VecDeque<RemoteMessage>, Error>
|
||||
{
|
||||
pub fn on_message_input(&self, value: ProposedValue) -> Result<VecDeque<RemoteMessage>, Error> {
|
||||
// Upon receiving input v_i , input v_i to RBC_i. See Figure 2.
|
||||
if let Some(instance) = self.broadcast_instances.get(&self.uid) {
|
||||
instance.propose_value(value).map_err(Error::from)
|
||||
}
|
||||
else {
|
||||
Ok(instance
|
||||
.propose_value(value)?
|
||||
.into_iter()
|
||||
.map(TargetedBroadcastMessage::into_remote_message)
|
||||
.collect())
|
||||
} else {
|
||||
Err(Error::NoSuchBroadcastInstance(self.uid))
|
||||
}
|
||||
}
|
||||
}
|
||||
/*
|
||||
/*
|
||||
no_outgoing
|
||||
}
|
||||
|
||||
|
|
|
@ -6,217 +6,112 @@ extern crate log;
|
|||
extern crate crossbeam;
|
||||
extern crate crossbeam_channel;
|
||||
extern crate merkle;
|
||||
extern crate rand;
|
||||
extern crate simple_logger;
|
||||
|
||||
mod netsim;
|
||||
use rand::Rng;
|
||||
use std::collections::{HashSet, VecDeque};
|
||||
|
||||
use crossbeam_channel::{Receiver, Sender};
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use hbbft::broadcast::{Broadcast, BroadcastTarget, TargetedBroadcastMessage};
|
||||
use hbbft::messaging::ProposedValue;
|
||||
use hbbft::proto::BroadcastMessage;
|
||||
|
||||
use hbbft::broadcast;
|
||||
use hbbft::broadcast::Broadcast;
|
||||
use hbbft::common_subset;
|
||||
use hbbft::messaging;
|
||||
use hbbft::messaging::{AlgoMessage, Algorithm, Handler, LocalMessage, MessageLoop, NodeUid,
|
||||
ProposedValue, QMessage, RemoteMessage, RemoteNode};
|
||||
use hbbft::proto::*;
|
||||
|
||||
use netsim::NetSim;
|
||||
|
||||
/// This is a structure to start a consensus node.
|
||||
pub struct TestNode<'a> {
|
||||
/// Node identifier.
|
||||
uid: NodeUid,
|
||||
/// RX handle indexed with the transmitting node address. One handle for
|
||||
/// each other node.
|
||||
rxs: HashMap<NodeUid, Receiver<Message<ProposedValue>>>,
|
||||
/// Optionally, a value to be broadcast by this node.
|
||||
value: Option<ProposedValue>,
|
||||
/// Messaging system.
|
||||
message_loop: MessageLoop<'a, Error>,
|
||||
struct TestNode {
|
||||
broadcast: Broadcast<usize>,
|
||||
queue: VecDeque<(usize, BroadcastMessage<ProposedValue>)>,
|
||||
}
|
||||
|
||||
impl<'a> TestNode<'a> {
|
||||
/// Consensus node constructor. It only initialises initial parameters.
|
||||
pub fn new(
|
||||
uid: NodeUid,
|
||||
txs: HashMap<NodeUid, Sender<Message<ProposedValue>>>,
|
||||
rxs: HashMap<NodeUid, Receiver<Message<ProposedValue>>>,
|
||||
value: Option<ProposedValue>,
|
||||
) -> Self {
|
||||
impl TestNode {
|
||||
fn new(broadcast: Broadcast<usize>) -> TestNode {
|
||||
TestNode {
|
||||
uid,
|
||||
rxs,
|
||||
value,
|
||||
message_loop: MessageLoop::new(txs),
|
||||
broadcast,
|
||||
queue: VecDeque::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_handler<H: 'a + Handler<Error>>(&'a self, algo: Algorithm, handler: &'a H) {
|
||||
self.message_loop.insert_algo(algo, handler);
|
||||
}
|
||||
/// Creates `num` nodes, and returns the set of node IDs as well as the new `TestNode`s.
|
||||
fn create_test_nodes(num: usize) -> Vec<TestNode> {
|
||||
let node_ids: HashSet<usize> = (0..num).collect();
|
||||
(0..num)
|
||||
.map(|id| {
|
||||
TestNode::new(Broadcast::new(id, node_ids.clone(), num).expect("Instantiate broadcast"))
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn run(&'a self) -> Result<HashSet<ProposedValue>, Error> {
|
||||
let tx = self.message_loop.queue_tx();
|
||||
|
||||
if let Some(value) = &self.value {
|
||||
// Start the broadcast value transmission.
|
||||
tx.send(QMessage::Local(LocalMessage {
|
||||
dst: Algorithm::Broadcast(self.uid),
|
||||
message: AlgoMessage::BroadcastInput(value.clone()),
|
||||
}))?;
|
||||
}
|
||||
|
||||
crossbeam::scope(|scope| {
|
||||
// Spawn receive loops for messages from simulated remote
|
||||
// nodes. Each loop receives a message from the simulated remote
|
||||
// node and forwards it to the local message loop having annotated
|
||||
// the message with the sender node UID.
|
||||
for (uid, rx) in &self.rxs {
|
||||
let tx = tx.clone();
|
||||
let self_uid = self.uid;
|
||||
scope.spawn(move || {
|
||||
debug!("Node {} receiver {} starting", self_uid, uid);
|
||||
while let Ok(message) = rx.recv() {
|
||||
// FIXME: error handling
|
||||
tx.send(QMessage::Remote(RemoteMessage {
|
||||
node: RemoteNode::Node(*uid),
|
||||
message,
|
||||
})).unwrap();
|
||||
/// Pushes the messages into the queues of the corresponding recipients.
|
||||
fn dispatch_messages(
|
||||
nodes: &mut Vec<TestNode>,
|
||||
sender_id: usize,
|
||||
msgs: VecDeque<TargetedBroadcastMessage<usize>>,
|
||||
) {
|
||||
for msg in msgs {
|
||||
match msg {
|
||||
TargetedBroadcastMessage {
|
||||
target: BroadcastTarget::All,
|
||||
message,
|
||||
} => {
|
||||
for (i, node) in nodes.iter_mut().enumerate() {
|
||||
if i != sender_id {
|
||||
node.queue.push_back((sender_id, message.clone()))
|
||||
}
|
||||
debug!("Node {} receiver {} terminated", self_uid, uid);
|
||||
});
|
||||
}
|
||||
// Start the local message loop.
|
||||
let _ = self.message_loop.run();
|
||||
});
|
||||
|
||||
Err(Error::NotImplemented)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum Error {
|
||||
Messaging(messaging::Error),
|
||||
Broadcast(broadcast::Error),
|
||||
CommonSubset(common_subset::Error),
|
||||
Send(crossbeam_channel::SendError<QMessage>),
|
||||
NotImplemented,
|
||||
}
|
||||
|
||||
impl From<messaging::Error> for Error {
|
||||
fn from(e: messaging::Error) -> Error {
|
||||
Error::Messaging(e)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<broadcast::Error> for Error {
|
||||
fn from(e: broadcast::Error) -> Error {
|
||||
Error::Broadcast(e)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<common_subset::Error> for Error {
|
||||
fn from(e: common_subset::Error) -> Error {
|
||||
Error::CommonSubset(e)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<crossbeam_channel::SendError<QMessage>> for Error {
|
||||
fn from(e: crossbeam_channel::SendError<QMessage>) -> Error {
|
||||
Error::Send(e)
|
||||
}
|
||||
}
|
||||
|
||||
fn proposed_value(n: usize) -> ProposedValue {
|
||||
let b: u8 = (n & 0xff) as u8;
|
||||
vec![b; 10]
|
||||
}
|
||||
|
||||
fn node_addr(node_index: usize) -> NodeUid {
|
||||
format!("127.0.0.1:{}", node_index).parse().unwrap()
|
||||
}
|
||||
|
||||
/// Creates test nodes but does not run them.
|
||||
fn create_test_nodes(
|
||||
num_nodes: usize,
|
||||
net: &NetSim<Message<Vec<u8>>>,
|
||||
) -> HashMap<NodeUid, (TestNode, HashMap<NodeUid, Broadcast>)> {
|
||||
let mut nodes = HashMap::new();
|
||||
for n in 0..num_nodes {
|
||||
let value = proposed_value(n);
|
||||
let mut txs = HashMap::new();
|
||||
let mut rxs = HashMap::new();
|
||||
// Set up comms channels to other nodes.
|
||||
for m in 0..num_nodes {
|
||||
if n == m {
|
||||
// Skip the channel back to the node itself.
|
||||
continue;
|
||||
}
|
||||
let addr = node_addr(m);
|
||||
txs.insert(addr, net.tx(n, m));
|
||||
rxs.insert(addr, net.rx(m, n));
|
||||
}
|
||||
|
||||
let uid = node_addr(n);
|
||||
let all_uids: HashSet<NodeUid> = (0..num_nodes).into_iter().map(node_addr).collect();
|
||||
let all_uids_copy = all_uids.clone();
|
||||
|
||||
// Create a broadcast algorithm instance for each node.
|
||||
let mut broadcast_instances = HashMap::new();
|
||||
for uid in all_uids {
|
||||
match Broadcast::new(uid, all_uids_copy.clone(), num_nodes) {
|
||||
Ok(instance) => {
|
||||
broadcast_instances.insert(uid, instance);
|
||||
}
|
||||
Err(e) => {
|
||||
panic!("{:?}", e);
|
||||
}
|
||||
}
|
||||
TargetedBroadcastMessage {
|
||||
target: BroadcastTarget::Node(to_id),
|
||||
message,
|
||||
} => nodes[to_id].queue.push_back((sender_id, message)),
|
||||
}
|
||||
|
||||
nodes.insert(
|
||||
uid,
|
||||
(
|
||||
TestNode::new(uid, txs, rxs, Some(value)),
|
||||
broadcast_instances,
|
||||
),
|
||||
);
|
||||
}
|
||||
nodes
|
||||
}
|
||||
|
||||
/// Handles a queued message in a randomly selected node.
|
||||
fn handle_message(nodes: &mut Vec<TestNode>) -> (usize, Option<ProposedValue>) {
|
||||
let ids: Vec<usize> = nodes
|
||||
.iter()
|
||||
.enumerate()
|
||||
.filter(|(_, node)| !node.queue.is_empty())
|
||||
.map(|(id, _)| id)
|
||||
.collect();
|
||||
let id = *rand::thread_rng()
|
||||
.choose(&ids)
|
||||
.expect("no more messages in queue");
|
||||
let (from_id, msg) = nodes[id].queue.pop_front().expect("message not found");
|
||||
debug!("Handling {} -> {}: {:?}", from_id, id, msg);
|
||||
let (output, msgs) = nodes[id]
|
||||
.broadcast
|
||||
.handle_broadcast_message(&id, &msg)
|
||||
.expect("handling message");
|
||||
debug!("Sending: {:?}", msgs);
|
||||
dispatch_messages(nodes, id, msgs);
|
||||
(id, output)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_4_broadcast_nodes() {
|
||||
fn test_16_broadcast_nodes() {
|
||||
simple_logger::init_with_level(log::Level::Debug).unwrap();
|
||||
|
||||
const NUM_NODES: usize = 4;
|
||||
// Create 4 nodes.
|
||||
const NUM_NODES: usize = 16;
|
||||
let mut nodes = create_test_nodes(NUM_NODES);
|
||||
|
||||
let net: NetSim<Message<Vec<u8>>> = NetSim::new(NUM_NODES);
|
||||
let nodes = create_test_nodes(NUM_NODES, &net);
|
||||
let mut join_handles: HashMap<NodeUid, _> = HashMap::new();
|
||||
// Make node 0 propose a value.
|
||||
let proposed_value = b"Foo";
|
||||
let msgs = nodes[0]
|
||||
.broadcast
|
||||
.propose_value(proposed_value.to_vec())
|
||||
.expect("propose");
|
||||
dispatch_messages(&mut nodes, 0, msgs);
|
||||
|
||||
crossbeam::scope(|scope| {
|
||||
// Run the test nodes, each in its own thread.
|
||||
for (uid, (node, broadcast_instances)) in &nodes {
|
||||
join_handles.insert(
|
||||
*uid,
|
||||
scope.spawn(move || {
|
||||
// Register broadcast instance handlers with the message loop.
|
||||
for (instance_uid, instance) in broadcast_instances {
|
||||
node.add_handler(Algorithm::Broadcast(*instance_uid), instance);
|
||||
}
|
||||
debug!("Running {:?}", node.uid);
|
||||
node.run()
|
||||
}),
|
||||
);
|
||||
// Handle messages in random order until all nodes have output the proposed value.
|
||||
let mut received = 0;
|
||||
while received < NUM_NODES {
|
||||
let (id, output) = handle_message(&mut nodes);
|
||||
if let Some(value) = output {
|
||||
assert_eq!(value, proposed_value);
|
||||
received += 1;
|
||||
debug!("Node {} received", id);
|
||||
}
|
||||
|
||||
for (uid, join_handle) in join_handles {
|
||||
let result = join_handle.join();
|
||||
println!("Result of {}: {:?}", uid, result);
|
||||
}
|
||||
|
||||
println!("Finished");
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,49 +0,0 @@
|
|||
//! Network simulator for testing without message serialisation. Socket
|
||||
//! connections between nodes are simulated using
|
||||
//! `crossbeam_channel::unbounded`.
|
||||
|
||||
extern crate crossbeam_channel;
|
||||
extern crate log;
|
||||
|
||||
use crossbeam_channel::{unbounded, Receiver, Sender};
|
||||
|
||||
pub struct NetSim<Message: Clone + Send + Sync> {
|
||||
/// The number of simulated nodes.
|
||||
num_nodes: usize,
|
||||
/// All TX handles
|
||||
txs: Vec<Sender<Message>>,
|
||||
/// All RX handles
|
||||
rxs: Vec<Receiver<Message>>,
|
||||
}
|
||||
|
||||
impl<Message: Clone + Send + Sync> NetSim<Message> {
|
||||
pub fn new(num_nodes: usize) -> Self {
|
||||
assert!(num_nodes > 1);
|
||||
// All channels of a totally connected network of size `num_nodes`.
|
||||
let channels: Vec<(Sender<Message>, Receiver<Message>)> =
|
||||
(0..num_nodes * num_nodes).map(|_| unbounded()).collect();
|
||||
let txs = channels.iter().map(|&(ref tx, _)| tx.to_owned()).collect();
|
||||
let rxs = channels.iter().map(|&(_, ref rx)| rx.to_owned()).collect();
|
||||
NetSim {
|
||||
num_nodes,
|
||||
txs,
|
||||
rxs,
|
||||
}
|
||||
}
|
||||
|
||||
/// The TX side of a channel from node `src` to node `dst`.
|
||||
pub fn tx(&self, src: usize, dst: usize) -> Sender<Message> {
|
||||
assert!(src < self.num_nodes);
|
||||
assert!(dst < self.num_nodes);
|
||||
|
||||
self.txs[src * self.num_nodes + dst].clone()
|
||||
}
|
||||
|
||||
/// The RX side of a channel from node `src` to node `dst`.
|
||||
pub fn rx(&self, src: usize, dst: usize) -> Receiver<Message> {
|
||||
assert!(src < self.num_nodes);
|
||||
assert!(dst < self.num_nodes);
|
||||
|
||||
self.rxs[src * self.num_nodes + dst].clone()
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue