use std::collections::{BTreeMap, BTreeSet, VecDeque}; use std::fmt::{self, Debug}; use std::hash::Hash; use std::mem; use std::sync::Arc; use rand::{self, Rng}; use hbbft::crypto::{PublicKeySet, SecretKeySet}; use hbbft::messaging::{DistAlgorithm, NetworkInfo, Target, TargetedMessage}; /// A node identifier. In the tests, nodes are simply numbered. #[derive(Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Clone, Copy, Serialize, Deserialize, Rand)] pub struct NodeUid(pub usize); /// A "node" running an instance of the algorithm `D`. pub struct TestNode { /// This node's own ID. id: D::NodeUid, /// The instance of the broadcast algorithm. algo: D, /// Incoming messages from other nodes that this node has not yet handled. pub queue: VecDeque<(D::NodeUid, D::Message)>, /// The values this node has output so far. outputs: Vec, } impl TestNode { /// Returns the list of outputs received by this node. pub fn outputs(&self) -> &[D::Output] { &self.outputs } /// Returns whether the algorithm has terminated. #[allow(unused)] // Not used in all tests. pub fn terminated(&self) -> bool { self.algo.terminated() } /// Inputs a value into the instance. pub fn input(&mut self, input: D::Input) { self.algo.input(input).expect("input"); self.outputs.extend(self.algo.output_iter()); } /// Returns the internal algorithm's instance. #[allow(unused)] // Not used in all tests. pub fn instance(&self) -> &D { &self.algo } /// Creates a new test node with the given broadcast instance. fn new(mut algo: D) -> TestNode { let outputs = algo.output_iter().collect(); TestNode { id: algo.our_id().clone(), algo, queue: VecDeque::new(), outputs, } } /// Handles the first message in the node's queue. fn handle_message(&mut self) { let (from_id, msg) = self.queue.pop_front().expect("message not found"); debug!("Handling {:?} -> {:?}: {:?}", from_id, self.id, msg); self.algo .handle_message(&from_id, msg) .expect("handling message"); self.outputs.extend(self.algo.output_iter()); } /// Checks whether the node has messages to process fn is_idle(&self) -> bool { self.queue.is_empty() } } /// A strategy for picking the next good node to handle a message. pub enum MessageScheduler { /// Picks a random node. Random, /// Picks the first non-idle node. First, } impl MessageScheduler { /// Chooses a node to be the next one to handle a message. pub fn pick_node( &self, nodes: &BTreeMap>, ) -> D::NodeUid { match *self { MessageScheduler::First => nodes .iter() .find(|(_, node)| !node.queue.is_empty()) .map(|(id, _)| id.clone()) .expect("no more messages in queue"), MessageScheduler::Random => { let ids: Vec = nodes .iter() .filter(|(_, node)| !node.queue.is_empty()) .map(|(id, _)| id.clone()) .collect(); rand::thread_rng() .choose(&ids) .expect("no more messages in queue") .clone() } } } } /// A message combined with a sender. pub struct MessageWithSender { /// The sender of the message. pub sender: ::NodeUid, /// The targeted message (recipient and message body). pub tm: TargetedMessage<::Message, ::NodeUid>, } // The Debug implementation cannot be derived automatically, possibly due to a compiler bug. For // this reason, it is implemented manually here. impl fmt::Debug for MessageWithSender { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!( f, "MessageWithSender {{ sender: {:?}, tm: {:?} }}", self.sender, self.tm.target ) } } impl MessageWithSender { /// Creates a new message with a sender. pub fn new( sender: D::NodeUid, tm: TargetedMessage, ) -> MessageWithSender { MessageWithSender { sender, tm } } } /// An adversary that can control a set of nodes and pick the next good node to receive a message. /// /// See `TestNetwork::step()` for a more detailed description of its capabilities. pub trait Adversary { /// Chooses a node to be the next one to handle a message. /// /// Starvation is illegal, i.e. in every iteration a node that has pending incoming messages /// must be chosen. fn pick_node(&self, nodes: &BTreeMap>) -> D::NodeUid; /// Called when a node controlled by the adversary receives a message. fn push_message(&mut self, sender_id: D::NodeUid, msg: TargetedMessage); /// Produces a list of messages to be sent from the adversary's nodes. fn step(&mut self) -> Vec>; /// Initialize an adversary. This function's primary purpose is to inform the adversary over /// some aspects of the network, such as which nodes they control. fn init( &mut self, _all_nodes: &BTreeMap>, _adv_nodes: &BTreeMap>>, ) { // default: does nothing } } /// An adversary whose nodes never send any messages. pub struct SilentAdversary { scheduler: MessageScheduler, } impl SilentAdversary { /// Creates a new silent adversary with the given message scheduler. pub fn new(scheduler: MessageScheduler) -> SilentAdversary { SilentAdversary { scheduler } } } impl Adversary for SilentAdversary { fn pick_node(&self, nodes: &BTreeMap>) -> D::NodeUid { self.scheduler.pick_node(nodes) } fn push_message(&mut self, _: D::NodeUid, _: TargetedMessage) { // All messages are ignored. } fn step(&mut self) -> Vec> { vec![] // No messages are sent. } } /// Return true with a certain `probability` ([0 .. 1.0]). fn randomly(probability: f32) -> bool { assert!(probability <= 1.0); assert!(probability >= 0.0); let mut rng = rand::thread_rng(); rng.gen_range(0.0, 1.0) <= probability } #[test] fn test_randomly() { assert!(randomly(1.0)); assert!(!randomly(0.0)); } /// An adversary that performs naive replay attacks. /// /// The adversary will randomly take a message that is sent to one of its nodes and re-send it to /// a different node. Additionally, it will inject unrelated messages at random. #[allow(unused)] // not used in all tests pub struct RandomAdversary { /// The underlying scheduler used scheduler: MessageScheduler, /// Node ids seen by the adversary. known_node_ids: Vec, /// Node ids under control of adversary known_adversarial_ids: Vec, /// Internal queue for messages to be returned on the next `Adversary::step()` call outgoing: Vec>, /// Generates random messages to be injected generator: F, /// Probability of a message replay p_replay: f32, /// Probability of a message injection p_inject: f32, } impl RandomAdversary { /// Creates a new random adversary instance. #[allow(unused)] pub fn new(p_replay: f32, p_inject: f32, generator: F) -> RandomAdversary { assert!( p_inject < 0.95, "injections are repeated, p_inject must be smaller than 0.95" ); RandomAdversary { // The random adversary, true to its name, always schedules randomly. scheduler: MessageScheduler::Random, known_node_ids: Vec::new(), known_adversarial_ids: Vec::new(), outgoing: Vec::new(), generator, p_replay, p_inject, } } } impl TargetedMessage> Adversary for RandomAdversary { fn init( &mut self, all_nodes: &BTreeMap>, nodes: &BTreeMap>>, ) { self.known_adversarial_ids = nodes.keys().cloned().collect(); self.known_node_ids = all_nodes.keys().cloned().collect(); } fn pick_node(&self, nodes: &BTreeMap>) -> D::NodeUid { // Just let the scheduler pick a node. self.scheduler.pick_node(nodes) } fn push_message(&mut self, _: D::NodeUid, msg: TargetedMessage) { // If we have not discovered the network topology yet, abort. if self.known_node_ids.is_empty() { return; } // only replay a message in some cases if !randomly(self.p_replay) { return; } let TargetedMessage { message, target } = msg; match target { Target::All => { // Ideally, we would want to handle broadcast messages as well; however the // adversary API is quite cumbersome at the moment in regards to access to the // network topology. To re-send a broadcast message from one of the attacker // controlled nodes, we would have to get a list of attacker controlled nodes // here and use a random one as the origin/sender, this is not done here. return; } Target::Node(our_node_id) => { // Choose a new target to send the message to. The unwrap never fails, because we // ensured that `known_node_ids` is non-empty earlier. let mut rng = rand::thread_rng(); let new_target_node = rng.choose(&self.known_node_ids).unwrap().clone(); // TODO: We could randomly broadcast it instead, if we had access to topology // information. self.outgoing.push(MessageWithSender::new( our_node_id, TargetedMessage { target: Target::Node(new_target_node), message, }, )); } } } fn step(&mut self) -> Vec> { // Clear messages. let mut tmp = Vec::new(); mem::swap(&mut tmp, &mut self.outgoing); // Possibly inject more messages: while randomly(self.p_inject) { let mut rng = rand::thread_rng(); // Pick a random adversarial node and create a message using the generator. if let Some(sender) = rng.choose(&self.known_adversarial_ids[..]) { let tm = (self.generator)(); // Add to outgoing queue. tmp.push(MessageWithSender::new(sender.clone(), tm)); } } if !tmp.is_empty() { println!("Injecting random messages: {:?}", tmp); } tmp } } /// A collection of `TestNode`s representing a network. /// /// Each `TestNetwork` type is tied to a specific adversary and a distributed algorithm. It consists /// of a set of nodes, some of which are controlled by the adversary and some of which may be /// observer nodes, as well as a set of threshold-cryptography public keys. /// /// In addition to being able to participate correctly in the network using his nodes, the /// adversary can: /// /// 1. Decide which node is the next one to make progress, /// 2. Send arbitrary messages to any node originating from one of the nodes they control. /// /// See the `step` function for details on actual operation of the network. pub struct TestNetwork, D: DistAlgorithm> where ::NodeUid: Hash, { pub nodes: BTreeMap>, pub observer: TestNode, pub adv_nodes: BTreeMap>>, pub pk_set: PublicKeySet, adversary: A, } impl, D: DistAlgorithm> TestNetwork where D::Message: Clone, { /// Creates a new network with `good_num` good nodes, and the given `adversary` controlling /// `adv_num` nodes. pub fn new( good_num: usize, adv_num: usize, adversary: G, new_algo: F, ) -> TestNetwork where F: Fn(Arc>) -> D, G: Fn(BTreeMap>>) -> A, { let mut rng = rand::thread_rng(); let sk_set = SecretKeySet::random(adv_num, &mut rng); let pk_set = sk_set.public_keys(); let node_ids: BTreeSet = (0..(good_num + adv_num)).map(NodeUid).collect(); let new_node_by_id = |NodeUid(i): NodeUid| { ( NodeUid(i), TestNode::new(new_algo(Arc::new(NetworkInfo::new( NodeUid(i), node_ids.clone(), sk_set.secret_key_share(i as u64), pk_set.clone(), )))), ) }; let new_adv_node_by_id = |NodeUid(i): NodeUid| { ( NodeUid(i), Arc::new(NetworkInfo::new( NodeUid(i), node_ids.clone(), sk_set.secret_key_share(i as u64), pk_set.clone(), )), ) }; let adv_nodes: BTreeMap>> = (good_num ..(good_num + adv_num)) .map(NodeUid) .map(new_adv_node_by_id) .collect(); let mut network = TestNetwork { nodes: (0..good_num).map(NodeUid).map(new_node_by_id).collect(), observer: new_node_by_id(NodeUid(good_num + adv_num)).1, adversary: adversary(adv_nodes.clone()), pk_set: pk_set.clone(), adv_nodes, }; // Inform the adversary about their nodes. network.adversary.init(&network.nodes, &network.adv_nodes); let msgs = network.adversary.step(); for MessageWithSender { sender, tm } in msgs { network.dispatch_messages(sender, vec![tm]); } let mut initial_msgs: Vec<(D::NodeUid, Vec<_>)> = Vec::new(); for (id, node) in &mut network.nodes { initial_msgs.push((*id, node.algo.message_iter().collect())); } for (id, msgs) in initial_msgs { network.dispatch_messages(id, msgs); } network } /// Pushes the messages into the queues of the corresponding recipients. fn dispatch_messages(&mut self, sender_id: NodeUid, msgs: Q) where Q: IntoIterator> + Debug, { for msg in msgs { match msg.target { Target::All => { for node in self.nodes.values_mut() { if node.id != sender_id { node.queue.push_back((sender_id, msg.message.clone())) } } self.observer .queue .push_back((sender_id, msg.message.clone())); self.adversary.push_message(sender_id, msg); } Target::Node(to_id) => { if self.adv_nodes.contains_key(&to_id) { self.adversary.push_message(sender_id, msg); } else if let Some(node) = self.nodes.get_mut(&to_id) { node.queue.push_back((sender_id, msg.message)); } else { warn!( "Unknown recipient {:?} for message: {:?}", to_id, msg.message ); } } } } while !self.observer.queue.is_empty() { self.observer.handle_message(); } } /// Performs one iteration of the network, consisting of the following steps: /// /// 1. Give the adversary a chance to send messages of his choosing through `Adversary::step()`, /// 2. Let the adversary pick a node that receives its next message through /// `Adversary::pick_node()`. /// /// Returns the node ID of the node that made progress. pub fn step(&mut self) -> NodeUid { // We let the adversary send out messages to any number of nodes. let msgs = self.adversary.step(); for MessageWithSender { sender, tm } in msgs { self.dispatch_messages(sender, Some(tm)); } // Now one node is chosen to make progress, we let the adversary decide which. let id = self.adversary.pick_node(&self.nodes); // The node handles the incoming message and creates new outgoing ones to be dispatched. let msgs: Vec<_> = { let node = self.nodes.get_mut(&id).unwrap(); // Ensure the adversary is playing fair by selecting a node that will result in actual // progress being made, otherwise `TestNode::handle_message()` will panic on `expect()` // with a much more cryptic error message. assert!( !node.is_idle(), "adversary illegally selected an idle node in pick_node()" ); node.handle_message(); node.algo.message_iter().collect() }; self.dispatch_messages(id, msgs); id } /// Inputs a value in node `id`. pub fn input(&mut self, id: NodeUid, value: D::Input) { let msgs: Vec<_> = { let node = self.nodes.get_mut(&id).expect("input instance"); node.input(value); node.algo.message_iter().collect() }; self.dispatch_messages(id, msgs); } /// Inputs a value in all nodes. #[allow(unused)] // Not used in all tests. pub fn input_all(&mut self, value: D::Input) where D::Input: Clone, { let ids: Vec = self.nodes.keys().cloned().collect(); for id in ids { self.input(id, value.clone()); } } }