dispatch targeted messages in the Common Subset message handling routine

This commit is contained in:
Vladimir Komendantskiy 2018-05-16 11:21:53 +01:00
parent 5f916c4d08
commit 21b898d8e0
1 changed files with 53 additions and 26 deletions

View File

@ -9,17 +9,16 @@ use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
use hbbft::common_subset;
use hbbft::common_subset::CommonSubset;
use hbbft::messaging::TargetedMessage;
use hbbft::messaging::{Target, TargetedMessage};
type ProposedValue = Vec<u8>;
#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
struct NodeUid(usize);
/// The queue of messages of a particular Common Subset instance received by a node.
type InputQueue = VecDeque<common_subset::Message<NodeUid>>;
/// The queue of messages output from a Common Subset instance.
type OutputQueue = VecDeque<TargetedMessage<common_subset::Message<NodeUid>, NodeUid>>;
/// The queue of messages of a particular Common Subset instance received by a node or output from a
/// Common Subset instance.
type MessageQueue = VecDeque<TargetedMessage<common_subset::Message<NodeUid>, NodeUid>>;
struct TestNode {
/// Sender ID.
@ -42,7 +41,7 @@ impl TestNode {
}
}
fn handle_message(&mut self) -> (Option<HashMap<NodeUid, Vec<u8>>>, OutputQueue) {
fn handle_message(&mut self) -> (Option<HashMap<NodeUid, Vec<u8>>>, MessageQueue) {
let (sender_id, message) = self.queue
.pop_front()
.expect("popping a message off the queue");
@ -77,15 +76,26 @@ impl TestNetwork {
}
}
fn dispatch_messages(&mut self, sender_id: NodeUid, messages: InputQueue) {
fn dispatch_messages(&mut self, sender_id: NodeUid, messages: MessageQueue) {
for message in messages {
for (id, node) in &mut self.nodes {
if *id != sender_id {
debug!(
"Dispatching from {:?} to {:?}: {:?}",
sender_id, id, message
);
node.queue.push_back((sender_id, message.clone()));
match message {
TargetedMessage {
target: Target::Node(id),
message,
} => {
let node = self.nodes.get_mut(&id).expect("finding recipient node");
node.queue.push_back((sender_id, message));
}
TargetedMessage {
target: Target::All,
message,
} => {
// Multicast the message to other nodes.
let _: Vec<()> = self.nodes
.iter_mut()
.filter(|(id, _)| **id != sender_id)
.map(|(_, node)| node.queue.push_back((sender_id, message.clone())))
.collect();
}
}
}
@ -117,10 +127,6 @@ impl TestNetwork {
fn step(&mut self) -> (NodeUid, Option<HashMap<NodeUid, ProposedValue>>) {
let sender_id = self.pick_node();
let (output, messages) = self.nodes.get_mut(&sender_id).unwrap().handle_message();
let messages = messages
.into_iter()
.map(|TargetedMessage { message, .. }| message)
.collect();
self.dispatch_messages(sender_id, messages);
(sender_id, output)
}
@ -133,13 +139,7 @@ impl TestNetwork {
.cs
.send_proposed_value(value)
.expect("send proposed value");
self.dispatch_messages(
sender_id,
messages
.into_iter()
.map(|TargetedMessage { message, .. }| message)
.collect(),
);
self.dispatch_messages(sender_id, messages);
}
}
@ -159,7 +159,7 @@ fn test_common_subset(mut network: TestNetwork) -> BTreeMap<NodeUid, TestNode> {
}
#[test]
fn test_common_subset_4_nodes() {
fn test_common_subset_4_nodes_same_proposed_value() {
let proposed_value = Vec::from("Fake news");
let all_ids: HashSet<NodeUid> = (0..4).map(NodeUid).collect();
let mut network = TestNetwork::new(&all_ids);
@ -179,3 +179,30 @@ fn test_common_subset_4_nodes() {
assert_eq!(node.decision, Some(expected_node_decision.clone()));
}
}
#[test]
fn test_common_subset_5_nodes_different_proposed_values() {
let proposed_values = vec![
Vec::from("Alpha"),
Vec::from("Bravo"),
Vec::from("Charlie"),
Vec::from("Delta"),
Vec::from("Echo"),
];
let all_ids: HashSet<NodeUid> = (0..5).map(NodeUid).collect();
let mut network = TestNetwork::new(&all_ids);
let expected_node_decisions: HashMap<NodeUid, ProposedValue> =
all_ids.into_iter().zip(proposed_values).collect();
// Nodes propose their values.
let _: Vec<()> = expected_node_decisions
.iter()
.map(|(id, value)| network.send_proposed_value(*id, value.clone()))
.collect();
let nodes = test_common_subset(network);
for node in nodes.values() {
assert_eq!(node.decision, Some(expected_node_decisions.clone()));
}
}