//! Common supertraits for consensus protocols. use std::collections::BTreeMap; use std::fmt::{Debug, Display}; use std::hash::Hash; use std::iter::once; use failure::Fail; use rand::Rng; use serde::{de::DeserializeOwned, Serialize}; use crate::fault_log::{Fault, FaultLog}; use crate::sender_queue::SenderQueueableMessage; use crate::{Target, TargetedMessage}; /// A transaction, user message, or other user data. pub trait Contribution: Eq + Debug + Hash + Send + Sync {} impl Contribution for C where C: Eq + Debug + Hash + Send + Sync {} /// A peer node's unique identifier. pub trait NodeIdT: Eq + Ord + Clone + Debug + Hash + Send + Sync {} impl NodeIdT for N where N: Eq + Ord + Clone + Debug + Hash + Send + Sync {} /// A consensus protocol fault. pub trait FaultT: Clone + Debug + Fail + PartialEq {} impl FaultT for N where N: Clone + Debug + Fail + PartialEq {} /// Messages. pub trait Message: Debug + Send + Sync {} impl Message for M where M: Debug + Send + Sync {} /// Session identifiers. pub trait SessionIdT: Display + Serialize + Send + Sync + Clone + Debug {} impl SessionIdT for S where S: Display + Serialize + Send + Sync + Clone + Debug {} /// Epochs. pub trait EpochT: Copy + Message + Default + Eq + Ord + Serialize + DeserializeOwned {} impl EpochT for E where E: Copy + Message + Default + Eq + Ord + Serialize + DeserializeOwned {} /// Single algorithm step outcome. /// /// Each time input (typically in the form of user input or incoming network messages) is provided /// to an instance of an algorithm, a `Step` is produced, potentially containing output values, /// a fault log, and network messages. /// /// Any `Step` **must always be used** by the client application; at the very least the resulting /// messages must be queued. /// /// ## Handling unused Steps /// /// In the (rare) case of a `Step` not being of any interest at all, instead of discarding it /// through `let _ = ...` or similar constructs, the implicit assumption should explicitly be /// checked instead: /// /// ```ignore /// assert!(alg.propose(123).expect("Could not propose value").is_empty(), /// "Algorithm will never output anything on first proposal"); /// ``` /// /// If an edge case occurs and outgoing messages are generated as a result, the `assert!` will /// catch it, instead of potentially stalling the algorithm. #[must_use = "The algorithm step result must be used."] #[derive(Debug)] pub struct Step { /// The algorithm's output, after consensus has been reached. This is guaranteed to be the same /// in all nodes. pub output: Vec, /// A list of nodes that are not following consensus, together with information about the /// detected misbehavior. pub fault_log: FaultLog, /// A list of messages that must be sent to other nodes. Each entry contains a message and a /// `Target`. pub messages: Vec>, } impl Default for Step where F: Fail, { fn default() -> Self { Step { output: Vec::default(), fault_log: FaultLog::default(), messages: Vec::default(), } } } impl Step where F: Fail, { /// Returns the same step, with the given additional output. pub fn with_output>>(mut self, output: T) -> Self { self.output.extend(output.into()); self } /// Converts `self` into a step of another type, given conversion methods for output, faults, /// and messages. pub fn map( self, f_out: FO, f_fail: FF, mut f_msg: FM, ) -> Step where F2: Fail, FO: FnMut(O) -> O2, FF: FnMut(F) -> F2, FM: FnMut(M) -> M2, { Step { output: self.output.into_iter().map(f_out).collect(), fault_log: self.fault_log.map(f_fail), messages: self .messages .into_iter() .map(|tm| tm.map(&mut f_msg)) .collect(), } } /// Extends `self` with `other`s messages and fault logs, and returns `other.output`. #[must_use] pub fn extend_with( &mut self, other: Step, f_fail: FF, mut f_msg: FM, ) -> Vec where F2: Fail, FF: FnMut(F2) -> F, FM: FnMut(M2) -> M, { let fails = other.fault_log.map(f_fail); self.fault_log.extend(fails); let msgs = other.messages.into_iter().map(|tm| tm.map(&mut f_msg)); self.messages.extend(msgs); other.output } /// Adds the outputs, fault logs and messages of `other` to `self`. pub fn extend(&mut self, other: Self) { self.output.extend(other.output); self.fault_log.extend(other.fault_log); self.messages.extend(other.messages); } /// Extends this step with `other` and returns the result. pub fn join(mut self, other: Self) -> Self { self.extend(other); self } /// Returns `true` if there are no messages, faults or outputs. pub fn is_empty(&self) -> bool { self.output.is_empty() && self.fault_log.is_empty() && self.messages.is_empty() } } impl From> for Step where F: Fail, { fn from(fault_log: FaultLog) -> Self { Step { fault_log, ..Step::default() } } } impl From> for Step where F: Fail, { fn from(fault: Fault) -> Self { Step { fault_log: fault.into(), ..Step::default() } } } impl From> for Step where F: Fail, { fn from(msg: TargetedMessage) -> Self { Step { messages: once(msg).collect(), ..Step::default() } } } impl From for Step where I: IntoIterator>, F: Fail, { fn from(msgs: I) -> Self { Step { messages: msgs.into_iter().collect(), ..Step::default() } } } /// An interface to objects with epoch numbers. Different algorithms may have different internal /// notion of _epoch_. This interface summarizes the properties that are essential for the message /// sender queue. pub trait Epoched { /// Type of epoch. type Epoch: EpochT; /// Returns the object's epoch number. fn epoch(&self) -> Self::Epoch; } /// An alias for the type of `Step` returned by `D`'s methods. pub type CpStep = Step< ::Message, ::Output, ::NodeId, ::FaultKind, >; impl<'i, M, O, N, F> Step where N: NodeIdT, M: 'i + Clone + SenderQueueableMessage, F: Fail, { /// Removes and returns any messages that are not yet accepted by remote nodes according to the /// mapping `remote_epochs`. This way the returned messages are postponed until later, and the /// remaining messages can be sent to remote nodes without delay. pub fn defer_messages( &mut self, peer_epochs: &BTreeMap, max_future_epochs: u64, ) -> Vec<(N, M)> { let mut deferred_msgs: Vec<(N, M)> = Vec::new(); let mut passed_msgs: Vec<_> = Vec::new(); for msg in self.messages.drain(..) { match msg.target.clone() { Target::Nodes(mut ids) => { let is_premature = |&them| msg.message.is_premature(them, max_future_epochs); let is_obsolete = |&them| msg.message.is_obsolete(them); for (id, them) in peer_epochs { if ids.contains(id) { if is_premature(them) { deferred_msgs.push((id.clone(), msg.message.clone())); ids.remove(id); } else if is_obsolete(them) { ids.remove(id); } } } if !ids.is_empty() { passed_msgs.push(Target::Nodes(ids).message(msg.message)); } } Target::AllExcept(mut exclude) => { let is_premature = |&them| msg.message.is_premature(them, max_future_epochs); let is_obsolete = |&them| msg.message.is_obsolete(them); for (id, them) in peer_epochs { if !exclude.contains(id) { if is_premature(them) { deferred_msgs.push((id.clone(), msg.message.clone())); exclude.insert(id.clone()); } else if is_obsolete(them) { exclude.insert(id.clone()); } } } passed_msgs.push(Target::AllExcept(exclude).message(msg.message)); } } } self.messages.extend(passed_msgs); deferred_msgs } } /// A consensus protocol that defines a message flow. /// /// Many algorithms require an RNG which must be supplied on each call. It is up to the caller to /// ensure that this random number generator is cryptographically secure. pub trait ConsensusProtocol: Send + Sync { /// Unique node identifier. type NodeId: NodeIdT; /// The input provided by the user. type Input; /// The output type. Some algorithms return an output exactly once, others return multiple /// times. type Output; /// The messages that need to be exchanged between the instances in the participating nodes. type Message: Message; /// The errors that can occur during execution. type Error: Fail; /// The kinds of message faults that can be detected during execution. type FaultKind: FaultT; /// Handles an input provided by the user, and returns fn handle_input( &mut self, input: Self::Input, rng: &mut R, ) -> Result, Self::Error> where Self: Sized; /// Handles a message received from node `sender_id`. fn handle_message( &mut self, sender_id: &Self::NodeId, message: Self::Message, rng: &mut R, ) -> Result, Self::Error> where Self: Sized; /// Returns `true` if execution has completed and this instance can be dropped. fn terminated(&self) -> bool; /// Returns this node's own ID. fn our_id(&self) -> &Self::NodeId; }