//! Integration tests of the Asynchronous Common Subset protocol. extern crate hbbft; #[macro_use] extern crate log; extern crate env_logger; use std::collections::{BTreeMap, HashMap, HashSet, VecDeque}; use hbbft::common_subset; use hbbft::common_subset::CommonSubset; use hbbft::messaging::{DistAlgorithm, Target, TargetedMessage}; type ProposedValue = Vec; #[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] struct NodeUid(usize); /// The queue of messages of a particular Common Subset instance received by a node or output from a /// Common Subset instance. type MessageQueue = VecDeque, NodeUid>>; struct TestNode { /// Sender ID. id: NodeUid, /// The Common Subset algorithm. cs: CommonSubset, /// Queue of tuples of a sender ID and a message. queue: VecDeque<(NodeUid, common_subset::Message)>, /// The output of the Common Subset algorithm, if there is one. decision: Option>, } impl TestNode { fn new(id: NodeUid, cs: CommonSubset) -> TestNode { TestNode { id, cs, queue: VecDeque::new(), decision: None, } } fn handle_message(&mut self) -> (Option>>, MessageQueue) { let (sender_id, message) = self.queue .pop_front() .expect("popping a message off the queue"); self.cs .handle_message(&sender_id, message) .expect("handling a Common Subset message"); let output = self.cs.next_output(); let messages = self.cs.message_iter().collect(); debug!("{:?} produced messages: {:?}", self.id, messages); if let Some(ref decision) = output { self.decision = Some(decision.clone()); } (output, messages) } } struct TestNetwork { nodes: BTreeMap, /// The next node to handle a message in its queue. scheduled_node_id: NodeUid, } impl TestNetwork { fn new(all_ids: &HashSet) -> TestNetwork { let num_nodes = all_ids.len(); // Make a node with an Agreement instance associated with the proposer node 0. let make_node = |id: NodeUid| { let cs = CommonSubset::new(id, all_ids).expect("Node creation"); (id, TestNode::new(id, cs)) }; TestNetwork { nodes: (0..num_nodes).map(NodeUid).map(make_node).collect(), scheduled_node_id: NodeUid(0), } } fn dispatch_messages(&mut self, sender_id: NodeUid, messages: MessageQueue) { for message in messages { match message { TargetedMessage { target: Target::Node(id), message, } => { let node = self.nodes.get_mut(&id).expect("finding recipient node"); node.queue.push_back((sender_id, message)); } TargetedMessage { target: Target::All, message, } => { // Multicast the message to other nodes. let _: Vec<()> = self.nodes .iter_mut() .filter(|(id, _)| **id != sender_id) .map(|(_, node)| node.queue.push_back((sender_id, message.clone()))) .collect(); } } } } // Gets a node for receiving a message and picks the next node with a // non-empty message queue in a cyclic order. fn pick_node(&mut self) -> NodeUid { let id = self.scheduled_node_id; // Try a node with a higher ID for fairness. if let Some(next_id) = self.nodes .iter() .find(|(&next_id, node)| id < next_id && !node.queue.is_empty()) .map(|(id, _)| *id) { self.scheduled_node_id = next_id; } else { // Fall back to nodes up to the currently scheduled ID. self.scheduled_node_id = self.nodes .iter() .find(|(&next_id, node)| id >= next_id && !node.queue.is_empty()) .map(|(id, _)| *id) .expect("no more messages in any node's queue") } debug!("Picked node {:?}", self.scheduled_node_id); id } fn step(&mut self) -> (NodeUid, Option>) { let sender_id = self.pick_node(); let (output, messages) = self.nodes.get_mut(&sender_id).unwrap().handle_message(); self.dispatch_messages(sender_id, messages); (sender_id, output) } /// Make Node 0 propose a value. fn send_proposed_value(&mut self, sender_id: NodeUid, value: ProposedValue) { let messages = { let cs = &mut self.nodes.get_mut(&sender_id).unwrap().cs; cs.send_proposed_value(value).expect("send proposed value"); cs.message_iter().collect() }; self.dispatch_messages(sender_id, messages); } } fn test_common_subset(mut network: TestNetwork) -> BTreeMap { let _ = env_logger::try_init(); // Pick the first node with a non-empty queue. network.pick_node(); while network.nodes.values().any(|node| !node.cs.terminated()) { let (NodeUid(id), output) = network.step(); if let Some(decision) = output { debug!("Node {} output {:?}", id, decision); } } network.nodes } #[test] fn test_common_subset_3_out_of_4_nodes_propose() { let proposed_value = Vec::from("Fake news"); let all_ids: HashSet = (0..4).map(NodeUid).collect(); let mut network = TestNetwork::new(&all_ids); let proposing_ids: HashSet = (0..3).map(NodeUid).collect(); let expected_node_decision: HashMap = proposing_ids .iter() .map(|id| (*id, proposed_value.clone())) .collect(); // Nodes propose values. let _: Vec<()> = proposing_ids .iter() .map(|id| network.send_proposed_value(*id, proposed_value.clone())) .collect(); let nodes = test_common_subset(network); for node in nodes.values() { assert_eq!(node.decision, Some(expected_node_decision.clone())); } } #[test] fn test_common_subset_5_nodes_different_proposed_values() { let proposed_values = vec![ Vec::from("Alpha"), Vec::from("Bravo"), Vec::from("Charlie"), Vec::from("Delta"), Vec::from("Echo"), ]; let all_ids: HashSet = (0..5).map(NodeUid).collect(); let mut network = TestNetwork::new(&all_ids); let expected_node_decisions: HashMap = all_ids.into_iter().zip(proposed_values).collect(); // Nodes propose their values. let _: Vec<()> = expected_node_decisions .iter() .map(|(id, value)| network.send_proposed_value(*id, value.clone())) .collect(); let nodes = test_common_subset(network); for node in nodes.values() { assert_eq!(node.decision, Some(expected_node_decisions.clone())); } }