diff --git a/src/agreement.rs b/src/agreement.rs index 7b92f7e..3e54d80 100644 --- a/src/agreement.rs +++ b/src/agreement.rs @@ -4,6 +4,7 @@ use itertools::Itertools; use std::collections::{BTreeSet, HashMap, VecDeque}; use std::fmt::Debug; use std::hash::Hash; +use std::mem; use messaging::{DistAlgorithm, Target, TargetedMessage}; @@ -17,6 +18,15 @@ pub enum AgreementMessage { Aux(u32, bool), } +impl AgreementMessage { + fn epoch(&self) -> u32 { + match *self { + AgreementMessage::BVal(epoch, _) => epoch, + AgreementMessage::Aux(epoch, _) => epoch, + } + } +} + /// Binary Agreement instance pub struct Agreement { /// The UID of the corresponding proposer node. @@ -44,6 +54,9 @@ pub struct Agreement { /// ever there at all. While the output value will still be required in a later epoch to decide /// the termination state. decision: Option, + /// A cache for messages for future epochs that cannot be handled yet. + // TODO: Find a better solution for this; defend against spam. + incoming_queue: Vec<(NodeUid, AgreementMessage)>, /// Termination flag. The Agreement instance doesn't terminate immediately /// upon deciding on the agreed value. This is done in order to help other /// nodes decide despite asynchrony of communication. Once the instance @@ -71,18 +84,20 @@ impl DistAlgorithm for Agreement Result<(), Self::Error> { + if self.terminated { + return Err(Error::Terminated); + } + if message.epoch() < self.epoch { + return Ok(()); // Message is obsolete: We are already in a later epoch. + } + if message.epoch() > self.epoch { + // Message is for a later epoch. We can't handle that yet. + self.incoming_queue.push((sender_id.clone(), message)); + return Ok(()); + } match message { - // The algorithm instance has already terminated. - _ if self.terminated => Err(Error::Terminated), - - AgreementMessage::BVal(epoch, b) if epoch == self.epoch => { - self.handle_bval(sender_id, b) - } - - AgreementMessage::Aux(epoch, b) if epoch == self.epoch => self.handle_aux(sender_id, b), - - // Epoch does not match. Ignore the message. - _ => Ok(()), + AgreementMessage::BVal(_, b) => self.handle_bval(sender_id, b), + AgreementMessage::Aux(_, b) => self.handle_aux(sender_id, b), } } @@ -108,7 +123,7 @@ impl DistAlgorithm for Agreement Agreement { +impl Agreement { pub fn new(uid: NodeUid, num_nodes: usize) -> Self { let num_faulty_nodes = (num_nodes - 1) / 3; @@ -124,6 +139,7 @@ impl Agreement { estimated: None, output: None, decision: None, + incoming_queue: Vec::new(), terminated: false, messages: VecDeque::new(), } @@ -134,20 +150,16 @@ impl Agreement { if self.epoch != 0 || self.estimated.is_some() { return Err(Error::InputNotAccepted); } + if self.num_nodes == 1 { + self.decision = Some(input); + self.output = Some(input); + self.terminated = true; + } // Set the initial estimated value to the input value. self.estimated = Some(input); // Record the input value as sent. - self.sent_bval.insert(input); - // Receive the BVAL message locally. - self.received_bval - .entry(self.uid.clone()) - .or_insert_with(BTreeSet::new) - .insert(input); - // Multicast BVAL - self.messages - .push_back(AgreementMessage::BVal(self.epoch, input)); - Ok(()) + self.send_bval(input) } /// Acceptance check to be performed before setting the input value. @@ -178,32 +190,35 @@ impl Agreement { self.messages .push_back(AgreementMessage::Aux(self.epoch, b)); // Receive the AUX message locally. - self.received_aux.insert(self.uid.clone(), b); + let our_uid = self.uid.clone(); + self.handle_aux(&our_uid, b)?; } - self.try_coin(); + self.try_coin()?; } // upon receiving BVAL_r(b) messages from f + 1 nodes, if // BVAL_r(b) has not been sent, multicast BVAL_r(b) else if count_bval == self.num_faulty_nodes + 1 && !self.sent_bval.contains(&b) { - // Record the value `b` as sent. - self.sent_bval.insert(b); - // Receive the BVAL message locally. - self.received_bval - .entry(self.uid.clone()) - .or_insert_with(BTreeSet::new) - .insert(b); - // Multicast BVAL. - self.messages - .push_back(AgreementMessage::BVal(self.epoch, b)); + self.send_bval(b)?; } Ok(()) } + fn send_bval(&mut self, b: bool) -> Result<(), Error> { + // Record the value `b` as sent. + self.sent_bval.insert(b); + // Multicast BVAL. + self.messages + .push_back(AgreementMessage::BVal(self.epoch, b)); + // Receive the BVAL message locally. + let our_uid = self.uid.clone(); + self.handle_bval(&our_uid, b) + } + fn handle_aux(&mut self, sender_id: &NodeUid, b: bool) -> Result<(), Error> { self.received_aux.insert(sender_id.clone(), b); if !self.bin_values.is_empty() { - self.try_coin(); + self.try_coin()?; } Ok(()) } @@ -237,11 +252,11 @@ impl Agreement { /// to compute the next decision estimate and outputs the optional decision /// value. The function may start the next epoch. In that case, it also /// returns a message for broadcast. - fn try_coin(&mut self) { + fn try_coin(&mut self) -> Result<(), Error> { let (count_aux, vals) = self.count_aux(); if count_aux < self.num_nodes - self.num_faulty_nodes { // Continue waiting for the (N - f) AUX messages. - return; + return Ok(()); } debug!("{:?} try_coin in epoch {}", self.uid, self.epoch); @@ -255,6 +270,7 @@ impl Agreement { self.terminated = self.terminated || self.decision == Some(coin); if self.terminated { debug!("Agreement instance {:?} terminated", self.uid); + return Ok(()); } // Start the next epoch. @@ -286,9 +302,12 @@ impl Agreement { }; let b = self.estimated.unwrap(); - self.sent_bval.insert(b); - self.messages - .push_back(AgreementMessage::BVal(self.epoch, b)); + self.send_bval(b)?; + let queued_msgs = mem::replace(&mut self.incoming_queue, Vec::new()); + for (sender_id, msg) in queued_msgs { + self.handle_message(&sender_id, msg)?; + } + Ok(()) } } diff --git a/tests/agreement.rs b/tests/agreement.rs index 1799f6f..f75f3e4 100644 --- a/tests/agreement.rs +++ b/tests/agreement.rs @@ -13,176 +13,80 @@ //! //! TODO: Implement adversaries and send BVAL messages at different times. +extern crate env_logger; extern crate hbbft; #[macro_use] extern crate log; -extern crate env_logger; +extern crate rand; -use std::collections::{BTreeMap, VecDeque}; +mod network; -use hbbft::agreement::{Agreement, AgreementMessage}; -use hbbft::messaging::{DistAlgorithm, Target, TargetedMessage}; +use std::collections::BTreeSet; +use std::iter; -#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] -struct NodeUid(usize); +use rand::Rng; -/// The queue of messages of a particular Agreement instance. -type MessageQueue = VecDeque>; +use hbbft::agreement::Agreement; +use network::{Adversary, MessageScheduler, NodeUid, SilentAdversary, TestNetwork}; -struct TestNode { - /// Sender ID. - id: NodeUid, - /// The only agreement instance. - agreement: Agreement, - /// Queue of tuples of a sender ID and a message. - queue: VecDeque<(NodeUid, AgreementMessage)>, - /// All outputs - outputs: Vec, -} - -impl TestNode { - fn new(id: NodeUid, agreement: Agreement) -> TestNode { - TestNode { - id, - agreement, - queue: VecDeque::new(), - outputs: Vec::new(), - } +fn test_agreement>>( + mut network: TestNetwork>, + input: Option, +) { + let ids: Vec = network.nodes.keys().cloned().collect(); + for id in ids { + network.input(id, input.unwrap_or_else(rand::random)); } - fn handle_message(&mut self) -> (Option, MessageQueue) { - let output; - let (sender_id, message) = self.queue - .pop_front() - .expect("popping a message off the queue"); - self.agreement - .handle_message(&sender_id, message) - .expect("handling an agreement message"); - if let Some(b) = self.agreement.next_output() { - self.outputs.push(b); - output = Some(b); - } else { - output = None; - } - let messages = self.agreement.message_iter().collect(); - debug!("{:?} produced messages: {:?}", self.id, messages); - (output, messages) - } -} - -struct TestNetwork { - nodes: BTreeMap, - /// The next node to handle a message in its queue. - scheduled_node_id: NodeUid, -} - -impl TestNetwork { - fn new(num_nodes: usize) -> TestNetwork { - // Make a node with an Agreement instance associated with the proposer node 0. - let make_node = - |id: NodeUid| (id, TestNode::new(id, Agreement::new(NodeUid(0), num_nodes))); - TestNetwork { - nodes: (0..num_nodes).map(NodeUid).map(make_node).collect(), - scheduled_node_id: NodeUid(0), - } - } - - fn dispatch_messages(&mut self, sender_id: NodeUid, messages: MessageQueue) { - for message in messages { - match message { - TargetedMessage { - target: Target::Node(id), - message, - } => { - let node = self.nodes.get_mut(&id).expect("finding recipient node"); - node.queue.push_back((sender_id, message)); - } - TargetedMessage { - target: Target::All, - message, - } => { - // Multicast the message to other nodes. - let _: Vec<()> = self.nodes - .iter_mut() - .filter(|(id, _)| **id != sender_id) - .map(|(_, node)| node.queue.push_back((sender_id, message.clone()))) - .collect(); - } + // Handle messages in random order until all nodes have output the proposed value. + while network.nodes.values().any(|node| node.outputs().is_empty()) { + let id = network.step(); + if let Some(&b) = network.nodes[&id].outputs().iter().next() { + if let Some(expected) = input { + assert_eq!(expected, b); } + debug!("Node {:?} decided: {}", id, b); } } - - // Gets a node for receiving a message and picks the next node with a - // non-empty message queue in a cyclic order. - fn pick_node(&mut self) -> NodeUid { - let id = self.scheduled_node_id; - // Try a node with a higher ID for fairness. - if let Some(next_id) = self.nodes - .iter() - .find(|(&next_id, node)| id < next_id && !node.queue.is_empty()) - .map(|(id, _)| *id) - { - self.scheduled_node_id = next_id; - } else { - // Fall back to nodes up to the currently scheduled ID. - self.scheduled_node_id = self.nodes - .iter() - .find(|(&next_id, node)| id >= next_id && !node.queue.is_empty()) - .map(|(id, _)| *id) - .expect("no more messages in any node's queue") - } - debug!("Picked node {:?}", self.scheduled_node_id); - id - } - - fn step(&mut self) -> (NodeUid, Option) { - let sender_id = self.pick_node(); - let (output, messages) = self.nodes.get_mut(&sender_id).unwrap().handle_message(); - self.dispatch_messages(sender_id, messages); - (sender_id, output) - } - - fn set_input(&mut self, sender_id: NodeUid, input: bool) { - let messages = { - let instance = &mut self.nodes.get_mut(&sender_id).unwrap().agreement; - - instance.set_input(input).expect("set input"); - instance.message_iter().collect() - }; - - self.dispatch_messages(sender_id, messages); - } } -fn test_agreement(mut network: TestNetwork) -> BTreeMap { +fn test_agreement_different_sizes(new_adversary: F) +where + A: Adversary>, + F: Fn(usize, usize) -> A, +{ + // This returns an error in all but the first test. let _ = env_logger::try_init(); - // Pick the first node with a non-empty queue. - network.pick_node(); - - while network.nodes.values().any(|node| node.outputs.is_empty()) { - let (NodeUid(id), output) = network.step(); - if let Some(value) = output { - debug!("Node {} output {}", id, value); + let mut rng = rand::thread_rng(); + let sizes = (1..6) + .chain(iter::once(rng.gen_range(6, 20))) + .chain(iter::once(rng.gen_range(30, 50))); + for size in sizes { + let num_faulty_nodes = (size - 1) / 3; + let num_good_nodes = size - num_faulty_nodes; + info!( + "Network size: {} good nodes, {} faulty nodes", + num_good_nodes, num_faulty_nodes + ); + for &input in &[None, Some(false), Some(true)] { + let adversary = new_adversary(num_good_nodes, num_faulty_nodes); + let new_agreement = |id, all_ids: BTreeSet<_>| Agreement::new(id, all_ids.len()); + let network = + TestNetwork::new(num_good_nodes, num_faulty_nodes, adversary, new_agreement); + test_agreement(network, input); } } - network.nodes } -/// Test 4 correct nodes. One of the nodes, #3, hasn't finished broadcast yet -/// and gets an input 0 as a result. #[test] -fn test_agreement_and_validity_with_1_late_node() { - let mut network = TestNetwork::new(4); - - network.set_input(NodeUid(0), true); - network.set_input(NodeUid(1), true); - network.set_input(NodeUid(2), true); - network.set_input(NodeUid(3), false); - - let nodes = test_agreement(network); - - for node in nodes.values() { - assert_eq!(node.outputs, vec![true]); - } +fn test_agreement_random_silent_all_true() { + let new_adversary = |_: usize, _: usize| SilentAdversary::new(MessageScheduler::Random); + test_agreement_different_sizes(new_adversary); +} + +#[test] +fn test_agreement_first_silent_all_true() { + let new_adversary = |_: usize, _: usize| SilentAdversary::new(MessageScheduler::First); + test_agreement_different_sizes(new_adversary); } diff --git a/tests/broadcast.rs b/tests/broadcast.rs index c694198..35a7e3f 100644 --- a/tests/broadcast.rs +++ b/tests/broadcast.rs @@ -107,7 +107,7 @@ where for size in sizes { let num_faulty_nodes = (size - 1) / 3; let num_good_nodes = size - num_faulty_nodes; - println!( + info!( "Network size: {} good nodes, {} faulty nodes", num_good_nodes, num_faulty_nodes ); diff --git a/tests/honey_badger.rs b/tests/honey_badger.rs index 6449822..eb6abeb 100644 --- a/tests/honey_badger.rs +++ b/tests/honey_badger.rs @@ -97,8 +97,6 @@ fn test_honey_badger_random_delivery_silent() { test_honey_badger_different_sizes(new_adversary, 10); } -// TODO: This test is flaky. Debug. -#[ignore] #[test] fn test_honey_badger_first_delivery_silent() { let new_adversary = |_: usize, _: usize| SilentAdversary::new(MessageScheduler::First);