Make Step independent of DistAlgorithm.

This commit is contained in:
Andreas Fackler 2018-11-07 14:26:14 +01:00 committed by Andreas Fackler
parent 9d4a477835
commit 7f784e7852
22 changed files with 187 additions and 254 deletions

View File

@ -27,7 +27,7 @@ use signifix::{metric, TryFrom};
use hbbft::dynamic_honey_badger::DynamicHoneyBadger; use hbbft::dynamic_honey_badger::DynamicHoneyBadger;
use hbbft::queueing_honey_badger::{Batch, QueueingHoneyBadger}; use hbbft::queueing_honey_badger::{Batch, QueueingHoneyBadger};
use hbbft::sender_queue::{Message, SenderQueue}; use hbbft::sender_queue::{Message, SenderQueue};
use hbbft::{DistAlgorithm, NetworkInfo, Step, Target}; use hbbft::{DaStep, DistAlgorithm, NetworkInfo, Step, Target};
const VERSION: &str = env!("CARGO_PKG_VERSION"); const VERSION: &str = env!("CARGO_PKG_VERSION");
const USAGE: &str = " const USAGE: &str = "
@ -137,14 +137,14 @@ pub struct TestNode<D: DistAlgorithm> {
hw_quality: HwQuality, hw_quality: HwQuality,
} }
type TestNodeStepResult<D> = Step<D>; type TestNodeStepResult<D> = DaStep<D>;
impl<D: DistAlgorithm> TestNode<D> impl<D: DistAlgorithm> TestNode<D>
where where
D::Message: Serialize + DeserializeOwned, D::Message: Serialize + DeserializeOwned,
{ {
/// Creates a new test node with the given broadcast instance. /// Creates a new test node with the given broadcast instance.
fn new((algo, step): (D, Step<D>), hw_quality: HwQuality) -> TestNode<D> { fn new((algo, step): (D, DaStep<D>), hw_quality: HwQuality) -> TestNode<D> {
let out_queue = step let out_queue = step
.messages .messages
.into_iter() .into_iter()
@ -264,7 +264,7 @@ where
hw_quality: HwQuality, hw_quality: HwQuality,
) -> TestNetwork<D> ) -> TestNetwork<D>
where where
F: Fn(NetworkInfo<NodeId>) -> (D, Step<D>), F: Fn(NetworkInfo<NodeId>) -> (D, DaStep<D>),
{ {
let node_ids = (0..(good_num + adv_num)).map(NodeId); let node_ids = (0..(good_num + adv_num)).map(NodeId);
let netinfos = NetworkInfo::generate_map(node_ids, &mut rand::thread_rng()) let netinfos = NetworkInfo::generate_map(node_ids, &mut rand::thread_rng())

View File

@ -78,12 +78,12 @@ impl<N: NodeIdT, S: SessionIdT> DistAlgorithm for BinaryAgreement<N, S> {
type Message = Message; type Message = Message;
type Error = Error; type Error = Error;
fn handle_input(&mut self, input: Self::Input) -> Result<Step<N, S>> { fn handle_input(&mut self, input: Self::Input) -> Result<Step<N>> {
self.propose(input) self.propose(input)
} }
/// Receive input from a remote node. /// Receive input from a remote node.
fn handle_message(&mut self, sender_id: &Self::NodeId, msg: Message) -> Result<Step<N, S>> { fn handle_message(&mut self, sender_id: &Self::NodeId, msg: Message) -> Result<Step<N>> {
self.handle_message(sender_id, msg) self.handle_message(sender_id, msg)
} }
@ -122,20 +122,20 @@ impl<N: NodeIdT, S: SessionIdT> BinaryAgreement<N, S> {
/// output. Otherwise either output is possible. /// output. Otherwise either output is possible.
/// ///
/// Note that if `can_propose` returns `false`, it is already too late to affect the outcome. /// Note that if `can_propose` returns `false`, it is already too late to affect the outcome.
pub fn propose(&mut self, input: bool) -> Result<Step<N, S>> { pub fn propose(&mut self, input: bool) -> Result<Step<N>> {
if !self.can_propose() { if !self.can_propose() {
return Ok(Step::default()); return Ok(Step::default());
} }
// Set the initial estimated value to the input value. // Set the initial estimated value to the input value.
self.estimated = Some(input); self.estimated = Some(input);
let sbvb_step = self.sbv_broadcast.handle_input(input)?; let sbvb_step = self.sbv_broadcast.send_bval(input)?;
self.handle_sbvb_step(sbvb_step) self.handle_sbvb_step(sbvb_step)
} }
/// Handles a message received from `sender_id`. /// Handles a message received from `sender_id`.
/// ///
/// This must be called with every message we receive from another node. /// This must be called with every message we receive from another node.
pub fn handle_message(&mut self, sender_id: &N, msg: Message) -> Result<Step<N, S>> { pub fn handle_message(&mut self, sender_id: &N, msg: Message) -> Result<Step<N>> {
let Message { epoch, content } = msg; let Message { epoch, content } = msg;
if self.decision.is_some() || (epoch < self.epoch && content.can_expire()) { if self.decision.is_some() || (epoch < self.epoch && content.can_expire()) {
// Message is obsolete: We are already in a later epoch or terminated. // Message is obsolete: We are already in a later epoch or terminated.
@ -161,9 +161,9 @@ impl<N: NodeIdT, S: SessionIdT> BinaryAgreement<N, S> {
&mut self, &mut self,
sender_id: &N, sender_id: &N,
content: MessageContent, content: MessageContent,
) -> Result<Step<N, S>> { ) -> Result<Step<N>> {
match content { match content {
MessageContent::SbvBroadcast(msg) => self.handle_sbv_broadcast(sender_id, msg), MessageContent::SbvBroadcast(msg) => self.handle_sbv_broadcast(sender_id, &msg),
MessageContent::Conf(v) => self.handle_conf(sender_id, v), MessageContent::Conf(v) => self.handle_conf(sender_id, v),
MessageContent::Term(v) => self.handle_term(sender_id, v), MessageContent::Term(v) => self.handle_term(sender_id, v),
MessageContent::Coin(msg) => self.handle_coin(sender_id, *msg), MessageContent::Coin(msg) => self.handle_coin(sender_id, *msg),
@ -174,15 +174,15 @@ impl<N: NodeIdT, S: SessionIdT> BinaryAgreement<N, S> {
fn handle_sbv_broadcast( fn handle_sbv_broadcast(
&mut self, &mut self,
sender_id: &N, sender_id: &N,
msg: sbv_broadcast::Message, msg: &sbv_broadcast::Message,
) -> Result<Step<N, S>> { ) -> Result<Step<N>> {
let sbvb_step = self.sbv_broadcast.handle_message(sender_id, msg)?; let sbvb_step = self.sbv_broadcast.handle_message(sender_id, &msg)?;
self.handle_sbvb_step(sbvb_step) self.handle_sbvb_step(sbvb_step)
} }
/// Handles a Synchronized Binary Value Broadcast step. On output, starts the `Conf` round or /// Handles a Synchronized Binary Value Broadcast step. On output, starts the `Conf` round or
/// decides. /// decides.
fn handle_sbvb_step(&mut self, sbvb_step: sbv_broadcast::Step<N>) -> Result<Step<N, S>> { fn handle_sbvb_step(&mut self, sbvb_step: sbv_broadcast::Step<N>) -> Result<Step<N>> {
let mut step = Step::default(); let mut step = Step::default();
let output = step.extend_with(sbvb_step, |msg| { let output = step.extend_with(sbvb_step, |msg| {
MessageContent::SbvBroadcast(msg).with_epoch(self.epoch) MessageContent::SbvBroadcast(msg).with_epoch(self.epoch)
@ -208,7 +208,7 @@ impl<N: NodeIdT, S: SessionIdT> BinaryAgreement<N, S> {
/// Handles a `Conf` message. When _N - f_ `Conf` messages with values in `bin_values` have /// Handles a `Conf` message. When _N - f_ `Conf` messages with values in `bin_values` have
/// been received, updates the epoch or decides. /// been received, updates the epoch or decides.
fn handle_conf(&mut self, sender_id: &N, v: BoolSet) -> Result<Step<N, S>> { fn handle_conf(&mut self, sender_id: &N, v: BoolSet) -> Result<Step<N>> {
self.received_conf.insert(sender_id.clone(), v); self.received_conf.insert(sender_id.clone(), v);
self.try_finish_conf_round() self.try_finish_conf_round()
} }
@ -216,7 +216,7 @@ impl<N: NodeIdT, S: SessionIdT> BinaryAgreement<N, S> {
/// Handles a `Term(v)` message. If we haven't yet decided on a value and there are more than /// Handles a `Term(v)` message. If we haven't yet decided on a value and there are more than
/// _f_ such messages with the same value from different nodes, performs expedite termination: /// _f_ such messages with the same value from different nodes, performs expedite termination:
/// decides on `v`, broadcasts `Term(v)` and terminates the instance. /// decides on `v`, broadcasts `Term(v)` and terminates the instance.
fn handle_term(&mut self, sender_id: &N, b: bool) -> Result<Step<N, S>> { fn handle_term(&mut self, sender_id: &N, b: bool) -> Result<Step<N>> {
self.received_term[b].insert(sender_id.clone()); self.received_term[b].insert(sender_id.clone());
// Check for the expedite termination condition. // Check for the expedite termination condition.
if self.decision.is_some() { if self.decision.is_some() {
@ -234,7 +234,7 @@ impl<N: NodeIdT, S: SessionIdT> BinaryAgreement<N, S> {
/// Handles a `ThresholdSign` message. If there is output, starts the next epoch. The function /// Handles a `ThresholdSign` message. If there is output, starts the next epoch. The function
/// may output a decision value. /// may output a decision value.
fn handle_coin(&mut self, sender_id: &N, msg: threshold_sign::Message) -> Result<Step<N, S>> { fn handle_coin(&mut self, sender_id: &N, msg: threshold_sign::Message) -> Result<Step<N>> {
let ts_step = match self.coin_state { let ts_step = match self.coin_state {
CoinState::Decided(_) => return Ok(Step::default()), // Coin value is already decided. CoinState::Decided(_) => return Ok(Step::default()), // Coin value is already decided.
CoinState::InProgress(ref mut ts) => ts CoinState::InProgress(ref mut ts) => ts
@ -245,7 +245,7 @@ impl<N: NodeIdT, S: SessionIdT> BinaryAgreement<N, S> {
} }
/// Multicasts a `Conf(values)` message, and handles it. /// Multicasts a `Conf(values)` message, and handles it.
fn send_conf(&mut self, values: BoolSet) -> Result<Step<N, S>> { fn send_conf(&mut self, values: BoolSet) -> Result<Step<N>> {
if self.conf_values.is_some() { if self.conf_values.is_some() {
// Only one `Conf` message is allowed in an epoch. // Only one `Conf` message is allowed in an epoch.
return Ok(Step::default()); return Ok(Step::default());
@ -262,11 +262,11 @@ impl<N: NodeIdT, S: SessionIdT> BinaryAgreement<N, S> {
} }
/// Multicasts and handles a message. Does nothing if we are only an observer. /// Multicasts and handles a message. Does nothing if we are only an observer.
fn send(&mut self, content: MessageContent) -> Result<Step<N, S>> { fn send(&mut self, content: MessageContent) -> Result<Step<N>> {
if !self.netinfo.is_validator() { if !self.netinfo.is_validator() {
return Ok(Step::default()); return Ok(Step::default());
} }
let step: Step<N, S> = Target::All let step: Step<N> = Target::All
.message(content.clone().with_epoch(self.epoch)) .message(content.clone().with_epoch(self.epoch))
.into(); .into();
let our_id = &self.our_id().clone(); let our_id = &self.our_id().clone();
@ -274,7 +274,7 @@ impl<N: NodeIdT, S: SessionIdT> BinaryAgreement<N, S> {
} }
/// Handles a step returned from the `ThresholdSign`. /// Handles a step returned from the `ThresholdSign`.
fn on_coin_step(&mut self, ts_step: threshold_sign::Step<N>) -> Result<Step<N, S>> { fn on_coin_step(&mut self, ts_step: threshold_sign::Step<N>) -> Result<Step<N>> {
let mut step = Step::default(); let mut step = Step::default();
let epoch = self.epoch; let epoch = self.epoch;
let to_msg = |c_msg| MessageContent::Coin(Box::new(c_msg)).with_epoch(epoch); let to_msg = |c_msg| MessageContent::Coin(Box::new(c_msg)).with_epoch(epoch);
@ -293,7 +293,7 @@ impl<N: NodeIdT, S: SessionIdT> BinaryAgreement<N, S> {
/// With two conf values, the next epoch's estimate is the coin value. If there is only one conf /// With two conf values, the next epoch's estimate is the coin value. If there is only one conf
/// value and that disagrees with the coin, the conf value is the next epoch's estimate. If /// value and that disagrees with the coin, the conf value is the next epoch's estimate. If
/// the unique conf value agrees with the coin, terminates and decides on that value. /// the unique conf value agrees with the coin, terminates and decides on that value.
fn try_update_epoch(&mut self) -> Result<Step<N, S>> { fn try_update_epoch(&mut self) -> Result<Step<N>> {
if self.decision.is_some() { if self.decision.is_some() {
// Avoid an infinite regression without making a Binary Agreement step. // Avoid an infinite regression without making a Binary Agreement step.
return Ok(Step::default()); return Ok(Step::default());
@ -330,7 +330,7 @@ impl<N: NodeIdT, S: SessionIdT> BinaryAgreement<N, S> {
} }
/// Decides on a value and broadcasts a `Term` message with that value. /// Decides on a value and broadcasts a `Term` message with that value.
fn decide(&mut self, b: bool) -> Step<N, S> { fn decide(&mut self, b: bool) -> Step<N> {
if self.decision.is_some() { if self.decision.is_some() {
return Step::default(); return Step::default();
} }
@ -348,7 +348,7 @@ impl<N: NodeIdT, S: SessionIdT> BinaryAgreement<N, S> {
} }
/// Checks whether the _N - f_ `Conf` messages have arrived, and if so, activates the coin. /// Checks whether the _N - f_ `Conf` messages have arrived, and if so, activates the coin.
fn try_finish_conf_round(&mut self) -> Result<Step<N, S>> { fn try_finish_conf_round(&mut self) -> Result<Step<N>> {
if self.conf_values.is_none() || self.count_conf() < self.netinfo.num_correct() { if self.conf_values.is_none() || self.count_conf() < self.netinfo.num_correct() {
return Ok(Step::default()); return Ok(Step::default());
} }
@ -368,7 +368,7 @@ impl<N: NodeIdT, S: SessionIdT> BinaryAgreement<N, S> {
} }
/// Increments the epoch, sets the new estimate and handles queued messages. /// Increments the epoch, sets the new estimate and handles queued messages.
fn update_epoch(&mut self, b: bool) -> Result<Step<N, S>> { fn update_epoch(&mut self, b: bool) -> Result<Step<N>> {
self.sbv_broadcast.clear(&self.received_term); self.sbv_broadcast.clear(&self.received_term);
self.received_conf.clear(); self.received_conf.clear();
for (v, id) in &self.received_term { for (v, id) in &self.received_term {
@ -384,7 +384,7 @@ impl<N: NodeIdT, S: SessionIdT> BinaryAgreement<N, S> {
); );
self.estimated = Some(b); self.estimated = Some(b);
let sbvb_step = self.sbv_broadcast.handle_input(b)?; let sbvb_step = self.sbv_broadcast.send_bval(b)?;
let mut step = self.handle_sbvb_step(sbvb_step)?; let mut step = self.handle_sbvb_step(sbvb_step)?;
let queued_msgs = self let queued_msgs = self
.incoming_queue .incoming_queue

View File

@ -103,7 +103,7 @@ impl From<bincode::Error> for Error {
/// An Binary Agreement result. /// An Binary Agreement result.
pub type Result<T> = ::std::result::Result<T, Error>; pub type Result<T> = ::std::result::Result<T, Error>;
pub type Step<N, T> = ::Step<BinaryAgreement<N, T>>; pub type Step<N> = ::Step<Message, bool, N>;
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub enum MessageContent { pub enum MessageContent {

View File

@ -16,11 +16,11 @@ use serde_derive::{Deserialize, Serialize};
use super::bool_multimap::BoolMultimap; use super::bool_multimap::BoolMultimap;
use super::bool_set::{self, BoolSet}; use super::bool_set::{self, BoolSet};
use super::{Error, Result}; use super::Result;
use fault_log::{Fault, FaultKind}; use fault_log::{Fault, FaultKind};
use {DistAlgorithm, NetworkInfo, NodeIdT, Target}; use {NetworkInfo, NodeIdT, Target};
pub type Step<N> = ::Step<SbvBroadcast<N>>; pub type Step<N> = ::Step<Message, BoolSet, N>;
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub enum Message { pub enum Message {
@ -59,33 +59,6 @@ pub struct SbvBroadcast<N> {
terminated: bool, terminated: bool,
} }
impl<N: NodeIdT> DistAlgorithm for SbvBroadcast<N> {
type NodeId = N;
type Input = bool;
type Output = BoolSet;
type Message = Message;
type Error = Error;
fn handle_input(&mut self, input: Self::Input) -> Result<Step<N>> {
self.send_bval(input)
}
fn handle_message(&mut self, sender_id: &Self::NodeId, msg: Self::Message) -> Result<Step<N>> {
match msg {
Message::BVal(b) => self.handle_bval(sender_id, b),
Message::Aux(b) => self.handle_aux(sender_id, b),
}
}
fn terminated(&self) -> bool {
self.terminated
}
fn our_id(&self) -> &Self::NodeId {
self.netinfo.our_id()
}
}
impl<N: NodeIdT> SbvBroadcast<N> { impl<N: NodeIdT> SbvBroadcast<N> {
pub fn new(netinfo: Arc<NetworkInfo<N>>) -> Self { pub fn new(netinfo: Arc<NetworkInfo<N>>) -> Self {
SbvBroadcast { SbvBroadcast {
@ -108,6 +81,27 @@ impl<N: NodeIdT> SbvBroadcast<N> {
self.terminated = false; self.terminated = false;
} }
pub fn handle_message(&mut self, sender_id: &N, msg: &Message) -> Result<Step<N>> {
match msg {
Message::BVal(b) => self.handle_bval(sender_id, *b),
Message::Aux(b) => self.handle_aux(sender_id, *b),
}
}
/// Returns the current `bin_values`: the set of `b` for which _2 f + 1_ `BVal`s were received.
pub fn bin_values(&self) -> BoolSet {
self.bin_values
}
/// Multicasts a `BVal(b)` message, and handles it.
pub fn send_bval(&mut self, b: bool) -> Result<Step<N>> {
// Record the value `b` as sent. If it was already there, don't send it again.
if !self.sent_bval.insert(b) {
return Ok(Step::default());
}
self.send(&Message::BVal(b))
}
/// Handles a `BVal(b)` message. /// Handles a `BVal(b)` message.
/// ///
/// Upon receiving _f + 1_ `BVal(b)`, multicasts `BVal(b)`. Upon receiving _2 f + 1_ `BVal(b)`, /// Upon receiving _f + 1_ `BVal(b)`, multicasts `BVal(b)`. Upon receiving _2 f + 1_ `BVal(b)`,
@ -126,7 +120,7 @@ impl<N: NodeIdT> SbvBroadcast<N> {
self.bin_values.insert(b); self.bin_values.insert(b);
if self.bin_values != bool_set::BOTH { if self.bin_values != bool_set::BOTH {
step.extend(self.send(Message::Aux(b))?) // First entry: send `Aux(b)`. step.extend(self.send(&Message::Aux(b))?) // First entry: send `Aux(b)`.
} else { } else {
step.extend(self.try_output()?); // Otherwise just check for `Conf` condition. step.extend(self.try_output()?); // Otherwise just check for `Conf` condition.
} }
@ -139,28 +133,14 @@ impl<N: NodeIdT> SbvBroadcast<N> {
Ok(step) Ok(step)
} }
/// Returns the current `bin_values`: the set of `b` for which _2 f + 1_ `BVal`s were received.
pub fn bin_values(&self) -> BoolSet {
self.bin_values
}
/// Multicasts and handles a message. Does nothing if we are only an observer. /// Multicasts and handles a message. Does nothing if we are only an observer.
fn send(&mut self, msg: Message) -> Result<Step<N>> { fn send(&mut self, msg: &Message) -> Result<Step<N>> {
if !self.netinfo.is_validator() { if !self.netinfo.is_validator() {
return self.try_output(); return self.try_output();
} }
let step: Step<_> = Target::All.message(msg.clone()).into(); let step: Step<_> = Target::All.message(msg.clone()).into();
let our_id = &self.our_id().clone(); let our_id = &self.netinfo.our_id().clone();
Ok(step.join(self.handle_message(our_id, msg)?)) Ok(step.join(self.handle_message(our_id, &msg)?))
}
/// Multicasts a `BVal(b)` message, and handles it.
fn send_bval(&mut self, b: bool) -> Result<Step<N>> {
// Record the value `b` as sent. If it was already there, don't send it again.
if !self.sent_bval.insert(b) {
return Ok(Step::default());
}
self.send(Message::BVal(b))
} }
/// Handles an `Aux` message. /// Handles an `Aux` message.

View File

@ -37,7 +37,7 @@ pub struct Broadcast<N> {
readys: BTreeMap<N, Vec<u8>>, readys: BTreeMap<N, Vec<u8>>,
} }
pub type Step<N> = ::Step<Broadcast<N>>; pub type Step<N> = ::DaStep<Broadcast<N>>;
impl<N: NodeIdT> DistAlgorithm for Broadcast<N> { impl<N: NodeIdT> DistAlgorithm for Broadcast<N> {
type NodeId = N; type NodeId = N;

View File

@ -91,7 +91,7 @@ pub use self::change::{Change, ChangeState, NodeChange};
pub use self::dynamic_honey_badger::DynamicHoneyBadger; pub use self::dynamic_honey_badger::DynamicHoneyBadger;
pub use self::error::{Error, ErrorKind, Result}; pub use self::error::{Error, ErrorKind, Result};
pub type Step<C, N> = ::Step<DynamicHoneyBadger<C, N>>; pub type Step<C, N> = ::DaStep<DynamicHoneyBadger<C, N>>;
/// The user input for `DynamicHoneyBadger`. /// The user input for `DynamicHoneyBadger`.
#[derive(Clone, Debug)] #[derive(Clone, Debug)]

View File

@ -19,7 +19,7 @@ use subset::{self as cs, Subset, SubsetOutput};
use threshold_decrypt::{self as td, ThresholdDecrypt}; use threshold_decrypt::{self as td, ThresholdDecrypt};
use {Contribution, DistAlgorithm, NetworkInfo, NodeIdT}; use {Contribution, DistAlgorithm, NetworkInfo, NodeIdT};
type CsStep<N> = cs::Step<N, EpochId>; type CsStep<N> = cs::Step<N>;
/// The status of an encrypted contribution. /// The status of an encrypted contribution.
#[derive(Debug)] #[derive(Debug)]

View File

@ -53,7 +53,7 @@ impl<C, N: Rand> Epoched for HoneyBadger<C, N> {
} }
} }
pub type Step<C, N> = ::Step<HoneyBadger<C, N>>; pub type Step<C, N> = ::DaStep<HoneyBadger<C, N>>;
impl<C, N> DistAlgorithm for HoneyBadger<C, N> impl<C, N> DistAlgorithm for HoneyBadger<C, N>
where where

View File

@ -153,4 +153,6 @@ pub use crypto::pairing;
pub use fault_log::{Fault, FaultKind, FaultLog}; pub use fault_log::{Fault, FaultKind, FaultLog};
pub use messaging::{SourcedMessage, Target, TargetedMessage}; pub use messaging::{SourcedMessage, Target, TargetedMessage};
pub use network_info::NetworkInfo; pub use network_info::NetworkInfo;
pub use traits::{Contribution, DistAlgorithm, Epoched, Message, NodeIdT, SessionIdT, Step}; pub use traits::{
Contribution, DaStep, DistAlgorithm, Epoched, Message, NodeIdT, SessionIdT, Step,
};

View File

@ -105,7 +105,7 @@ pub struct QueueingHoneyBadgerBuilder<T, N: Rand, Q> {
_phantom: PhantomData<T>, _phantom: PhantomData<T>,
} }
pub type QueueingHoneyBadgerWithStep<T, N, Q> = (QueueingHoneyBadger<T, N, Q>, Step<T, N, Q>); pub type QueueingHoneyBadgerWithStep<T, N, Q> = (QueueingHoneyBadger<T, N, Q>, Step<T, N>);
impl<T, N, Q> QueueingHoneyBadgerBuilder<T, N, Q> impl<T, N, Q> QueueingHoneyBadgerBuilder<T, N, Q>
where where
@ -187,7 +187,7 @@ pub struct QueueingHoneyBadger<T, N: Rand, Q> {
rng: Box<dyn Rng + Send + Sync>, rng: Box<dyn Rng + Send + Sync>,
} }
pub type Step<T, N, Q> = ::Step<QueueingHoneyBadger<T, N, Q>>; pub type Step<T, N> = ::Step<Message<N>, Batch<T, N>, N>;
impl<T, N, Q> DistAlgorithm for QueueingHoneyBadger<T, N, Q> impl<T, N, Q> DistAlgorithm for QueueingHoneyBadger<T, N, Q>
where where
@ -201,7 +201,7 @@ where
type Message = Message<N>; type Message = Message<N>;
type Error = Error; type Error = Error;
fn handle_input(&mut self, input: Self::Input) -> Result<Step<T, N, Q>> { fn handle_input(&mut self, input: Self::Input) -> Result<Step<T, N>> {
// User transactions are forwarded to `HoneyBadger` right away. Internal messages are // User transactions are forwarded to `HoneyBadger` right away. Internal messages are
// in addition signed and broadcast. // in addition signed and broadcast.
match input { match input {
@ -210,7 +210,7 @@ where
} }
} }
fn handle_message(&mut self, sender_id: &N, message: Self::Message) -> Result<Step<T, N, Q>> { fn handle_message(&mut self, sender_id: &N, message: Self::Message) -> Result<Step<T, N>> {
self.handle_message(sender_id, message) self.handle_message(sender_id, message)
} }
@ -243,7 +243,7 @@ where
/// If no proposal has yet been made for the current epoch, this may trigger one. In this case, /// If no proposal has yet been made for the current epoch, this may trigger one. In this case,
/// a nonempty step will returned, with the corresponding messages. (Or, if we are the only /// a nonempty step will returned, with the corresponding messages. (Or, if we are the only
/// validator, even with the completed batch as an output.) /// validator, even with the completed batch as an output.)
pub fn push_transaction(&mut self, tx: T) -> Result<Step<T, N, Q>> { pub fn push_transaction(&mut self, tx: T) -> Result<Step<T, N>> {
self.queue.extend(iter::once(tx)); self.queue.extend(iter::once(tx));
self.propose() self.propose()
} }
@ -252,12 +252,11 @@ where
/// ///
/// This stores a pending vote for the change. It will be included in some future batch, and /// This stores a pending vote for the change. It will be included in some future batch, and
/// once enough validators have been voted for the same change, it will take effect. /// once enough validators have been voted for the same change, it will take effect.
pub fn vote_for(&mut self, change: Change<N>) -> Result<Step<T, N, Q>> { pub fn vote_for(&mut self, change: Change<N>) -> Result<Step<T, N>> {
Ok(self Ok(self
.dyn_hb .dyn_hb
.handle_input(Input::Change(change)) .handle_input(Input::Change(change))
.map_err(ErrorKind::Input)? .map_err(ErrorKind::Input)?
.convert()
.join(self.propose()?)) .join(self.propose()?))
} }
@ -265,7 +264,7 @@ where
/// ///
/// This stores a pending vote for the change. It will be included in some future batch, and /// This stores a pending vote for the change. It will be included in some future batch, and
/// once enough validators have been voted for the same change, it will take effect. /// once enough validators have been voted for the same change, it will take effect.
pub fn vote_to_add(&mut self, node_id: N, pub_key: PublicKey) -> Result<Step<T, N, Q>> { pub fn vote_to_add(&mut self, node_id: N, pub_key: PublicKey) -> Result<Step<T, N>> {
self.vote_for(Change::NodeChange(NodeChange::Add(node_id, pub_key))) self.vote_for(Change::NodeChange(NodeChange::Add(node_id, pub_key)))
} }
@ -273,19 +272,18 @@ where
/// ///
/// This stores a pending vote for the change. It will be included in some future batch, and /// This stores a pending vote for the change. It will be included in some future batch, and
/// once enough validators have been voted for the same change, it will take effect. /// once enough validators have been voted for the same change, it will take effect.
pub fn vote_to_remove(&mut self, node_id: N) -> Result<Step<T, N, Q>> { pub fn vote_to_remove(&mut self, node_id: N) -> Result<Step<T, N>> {
self.vote_for(Change::NodeChange(NodeChange::Remove(node_id))) self.vote_for(Change::NodeChange(NodeChange::Remove(node_id)))
} }
/// Handles a message received from `sender_id`. /// Handles a message received from `sender_id`.
/// ///
/// This must be called with every message we receive from another node. /// This must be called with every message we receive from another node.
pub fn handle_message(&mut self, sender_id: &N, message: Message<N>) -> Result<Step<T, N, Q>> { pub fn handle_message(&mut self, sender_id: &N, message: Message<N>) -> Result<Step<T, N>> {
let step = self let step = self
.dyn_hb .dyn_hb
.handle_message(sender_id, message) .handle_message(sender_id, message)
.map_err(ErrorKind::HandleMessage)? .map_err(ErrorKind::HandleMessage)?;
.convert::<Self>();
for batch in &step.output { for batch in &step.output {
self.queue.remove_multiple(batch.iter()); self.queue.remove_multiple(batch.iter());
} }
@ -308,7 +306,7 @@ where
} }
/// Initiates the next epoch by proposing a batch from the queue. /// Initiates the next epoch by proposing a batch from the queue.
fn propose(&mut self) -> Result<Step<T, N, Q>> { fn propose(&mut self) -> Result<Step<T, N>> {
let mut step = Step::default(); let mut step = Step::default();
while self.can_propose() { while self.can_propose() {
let amount = cmp::max(1, self.batch_size / self.dyn_hb.netinfo().num_nodes()); let amount = cmp::max(1, self.batch_size / self.dyn_hb.netinfo().num_nodes());
@ -316,8 +314,7 @@ where
step.extend( step.extend(
self.dyn_hb self.dyn_hb
.handle_input(Input::User(proposal)) .handle_input(Input::User(proposal))
.map_err(ErrorKind::Propose)? .map_err(ErrorKind::Propose)?,
.convert(),
); );
} }
Ok(step) Ok(step)

View File

@ -1,17 +1,19 @@
//! Convenience methods for a `SenderQueue` wrapping a `DynamicHoneyBadger`. //! Convenience methods for a `SenderQueue` wrapping a `DynamicHoneyBadger`.
use std::result;
use crypto::PublicKey; use crypto::PublicKey;
use rand::Rand; use rand::Rand;
use serde::{de::DeserializeOwned, Serialize}; use serde::{de::DeserializeOwned, Serialize};
use super::{ use super::{
SenderQueue, SenderQueueableDistAlgorithm, SenderQueueableEpoch, SenderQueueableMessage, SenderQueue, SenderQueueableDistAlgorithm, SenderQueueableEpoch, SenderQueueableMessage,
SenderQueueableOutput, Step, SenderQueueableOutput,
}; };
use {Contribution, Epoched, NodeIdT}; use {Contribution, DaStep, Epoched, NodeIdT};
use dynamic_honey_badger::{ use dynamic_honey_badger::{
Batch, Change, ChangeState, DynamicHoneyBadger, Epoch, Message, NodeChange, Batch, Change, ChangeState, DynamicHoneyBadger, Epoch, Error as DhbError, Message, NodeChange,
}; };
impl<C, N> SenderQueueableOutput<N, Message<N>> for Batch<C, N> impl<C, N> SenderQueueableOutput<N, Message<N>> for Batch<C, N>
@ -90,7 +92,7 @@ where
} }
} }
type Result<C, N> = super::Result<Step<DynamicHoneyBadger<C, N>>, DynamicHoneyBadger<C, N>>; type Result<C, N> = result::Result<DaStep<SenderQueue<DynamicHoneyBadger<C, N>>>, DhbError>;
impl<C, N> SenderQueue<DynamicHoneyBadger<C, N>> impl<C, N> SenderQueue<DynamicHoneyBadger<C, N>>
where where

View File

@ -17,7 +17,7 @@ use std::fmt::Debug;
use rand::Rand; use rand::Rand;
use serde::{de::DeserializeOwned, Serialize}; use serde::{de::DeserializeOwned, Serialize};
use {DistAlgorithm, Epoched, NodeIdT, Target}; use {DaStep, DistAlgorithm, Epoched, NodeIdT, Target};
pub use self::message::Message; pub use self::message::Message;
@ -103,9 +103,7 @@ where
peer_epochs: BTreeMap<D::NodeId, <D::Message as Epoched>::LinEpoch>, peer_epochs: BTreeMap<D::NodeId, <D::Message as Epoched>::LinEpoch>,
} }
pub type Step<D> = ::Step<SenderQueue<D>>; pub type Step<D> = ::DaStep<SenderQueue<D>>;
pub type Result<T, D> = ::std::result::Result<T, <D as DistAlgorithm>::Error>;
impl<D> DistAlgorithm for SenderQueue<D> impl<D> DistAlgorithm for SenderQueue<D>
where where
@ -121,7 +119,7 @@ where
type Message = Message<D::Message>; type Message = Message<D::Message>;
type Error = D::Error; type Error = D::Error;
fn handle_input(&mut self, input: Self::Input) -> Result<Step<D>, D> { fn handle_input(&mut self, input: Self::Input) -> Result<DaStep<Self>, D::Error> {
self.handle_input(input) self.handle_input(input)
} }
@ -129,7 +127,7 @@ where
&mut self, &mut self,
sender_id: &D::NodeId, sender_id: &D::NodeId,
message: Self::Message, message: Self::Message,
) -> Result<Step<D>, D> { ) -> Result<DaStep<Self>, D::Error> {
self.handle_message(sender_id, message) self.handle_message(sender_id, message)
} }
@ -158,7 +156,7 @@ where
SenderQueueBuilder::new(algo, peer_ids) SenderQueueBuilder::new(algo, peer_ids)
} }
pub fn handle_input(&mut self, input: D::Input) -> Result<Step<D>, D> { pub fn handle_input(&mut self, input: D::Input) -> Result<DaStep<Self>, D::Error> {
self.apply(|algo| algo.handle_input(input)) self.apply(|algo| algo.handle_input(input))
} }
@ -166,7 +164,7 @@ where
&mut self, &mut self,
sender_id: &D::NodeId, sender_id: &D::NodeId,
message: Message<D::Message>, message: Message<D::Message>,
) -> Result<Step<D>, D> { ) -> Result<DaStep<Self>, D::Error> {
match message { match message {
Message::EpochStarted(lin_epoch) => Ok(self.handle_epoch_started(sender_id, lin_epoch)), Message::EpochStarted(lin_epoch) => Ok(self.handle_epoch_started(sender_id, lin_epoch)),
Message::Algo(msg) => self.handle_message_content(sender_id, msg), Message::Algo(msg) => self.handle_message_content(sender_id, msg),
@ -175,9 +173,9 @@ where
/// Applies `f` to the wrapped algorithm and converts the step in the result to a sender queue /// Applies `f` to the wrapped algorithm and converts the step in the result to a sender queue
/// step, deferring or dropping messages, where necessary. /// step, deferring or dropping messages, where necessary.
pub fn apply<F>(&mut self, f: F) -> Result<Step<D>, D> pub fn apply<F>(&mut self, f: F) -> Result<DaStep<Self>, D::Error>
where where
F: FnOnce(&mut D) -> Result<::Step<D>, D>, F: FnOnce(&mut D) -> Result<DaStep<D>, D::Error>,
{ {
let mut step = f(&mut self.algo)?; let mut step = f(&mut self.algo)?;
let mut sender_queue_step = self.update_lin_epoch(&step); let mut sender_queue_step = self.update_lin_epoch(&step);
@ -191,7 +189,7 @@ where
&mut self, &mut self,
sender_id: &D::NodeId, sender_id: &D::NodeId,
lin_epoch: <D::Message as Epoched>::LinEpoch, lin_epoch: <D::Message as Epoched>::LinEpoch,
) -> Step<D> { ) -> DaStep<Self> {
self.peer_epochs self.peer_epochs
.entry(sender_id.clone()) .entry(sender_id.clone())
.and_modify(|e| { .and_modify(|e| {
@ -225,7 +223,7 @@ where
&mut self, &mut self,
sender_id: &D::NodeId, sender_id: &D::NodeId,
epoch: <D::Message as Epoched>::Epoch, epoch: <D::Message as Epoched>::Epoch,
) -> Step<D> { ) -> DaStep<Self> {
// Send any HB messages for the HB epoch. // Send any HB messages for the HB epoch.
let mut ready_messages = self let mut ready_messages = self
.outgoing_queue .outgoing_queue
@ -239,7 +237,7 @@ where
.unwrap_or_default(), .unwrap_or_default(),
); );
} }
Step::from( Step::<D>::from(
ready_messages ready_messages
.into_iter() .into_iter()
.map(|msg| Target::Node(sender_id.clone()).message(Message::Algo(msg))), .map(|msg| Target::Node(sender_id.clone()).message(Message::Algo(msg))),
@ -251,12 +249,12 @@ where
&mut self, &mut self,
sender_id: &D::NodeId, sender_id: &D::NodeId,
content: D::Message, content: D::Message,
) -> Result<Step<D>, D> { ) -> Result<DaStep<Self>, D::Error> {
self.apply(|algo| algo.handle_message(sender_id, content)) self.apply(|algo| algo.handle_message(sender_id, content))
} }
/// Updates the current Honey Badger epoch. /// Updates the current Honey Badger epoch.
fn update_lin_epoch(&mut self, step: &::Step<D>) -> Step<D> { fn update_lin_epoch(&mut self, step: &DaStep<D>) -> DaStep<Self> {
// Look up `DynamicHoneyBadger` epoch updates and collect any added peers. // Look up `DynamicHoneyBadger` epoch updates and collect any added peers.
let new_epoch = step.output.iter().fold(self.lin_epoch, |lin_epoch, batch| { let new_epoch = step.output.iter().fold(self.lin_epoch, |lin_epoch, batch| {
let max_epoch = lin_epoch.max(batch.next_epoch()); let max_epoch = lin_epoch.max(batch.next_epoch());
@ -276,7 +274,7 @@ where
.message(Message::EpochStarted(self.lin_epoch)) .message(Message::EpochStarted(self.lin_epoch))
.into() .into()
} else { } else {
Step::default() Step::<D>::default()
} }
} }
@ -284,7 +282,7 @@ where
/// decomposing a `Target::All` message into `Target::Node` messages and sending some of the /// decomposing a `Target::All` message into `Target::Node` messages and sending some of the
/// resulting messages while placing onto the queue those remaining messages whose recipient is /// resulting messages while placing onto the queue those remaining messages whose recipient is
/// currently at an earlier epoch. /// currently at an earlier epoch.
fn defer_messages(&mut self, step: &mut ::Step<D>) { fn defer_messages(&mut self, step: &mut DaStep<D>) {
let max_future_epochs = self.algo.max_future_epochs(); let max_future_epochs = self.algo.max_future_epochs();
// Append the deferred messages onto the queues. // Append the deferred messages onto the queues.
for (id, message) in step.defer_messages(&self.peer_epochs, max_future_epochs) { for (id, message) in step.defer_messages(&self.peer_epochs, max_future_epochs) {
@ -355,7 +353,7 @@ where
self self
} }
pub fn build(self, our_id: D::NodeId) -> (SenderQueue<D>, Step<D>) { pub fn build(self, our_id: D::NodeId) -> (SenderQueue<D>, DaStep<SenderQueue<D>>) {
let lin_epoch = <D::Message as Epoched>::LinEpoch::default(); let lin_epoch = <D::Message as Epoched>::LinEpoch::default();
let sq = SenderQueue { let sq = SenderQueue {
algo: self.algo, algo: self.algo,
@ -364,7 +362,7 @@ where
outgoing_queue: self.outgoing_queue, outgoing_queue: self.outgoing_queue,
peer_epochs: self.peer_epochs, peer_epochs: self.peer_epochs,
}; };
let step: Step<D> = Target::All.message(Message::EpochStarted(lin_epoch)).into(); let step = Target::All.message(Message::EpochStarted(lin_epoch)).into();
(sq, step) (sq, step)
} }
} }

View File

@ -1,13 +1,15 @@
//! Convenience methods for a `SenderQueue` wrapping a `QueueingHoneyBadger`. //! Convenience methods for a `SenderQueue` wrapping a `QueueingHoneyBadger`.
use std::result;
use crypto::PublicKey; use crypto::PublicKey;
use rand::Rand; use rand::Rand;
use serde::{de::DeserializeOwned, Serialize}; use serde::{de::DeserializeOwned, Serialize};
use super::{SenderQueue, SenderQueueableDistAlgorithm, Step}; use super::{SenderQueue, SenderQueueableDistAlgorithm};
use queueing_honey_badger::{Change, QueueingHoneyBadger}; use queueing_honey_badger::{Change, Error as QhbError, QueueingHoneyBadger};
use transaction_queue::TransactionQueue; use transaction_queue::TransactionQueue;
use {Contribution, NodeIdT}; use {Contribution, DaStep, NodeIdT};
impl<T, N, Q> SenderQueueableDistAlgorithm for QueueingHoneyBadger<T, N, Q> impl<T, N, Q> SenderQueueableDistAlgorithm for QueueingHoneyBadger<T, N, Q>
where where
@ -20,8 +22,7 @@ where
} }
} }
type Result<T, N, Q> = type Result<T, N, Q> = result::Result<DaStep<SenderQueue<QueueingHoneyBadger<T, N, Q>>>, QhbError>;
super::Result<Step<QueueingHoneyBadger<T, N, Q>>, QueueingHoneyBadger<T, N, Q>>;
impl<T, N, Q> SenderQueue<QueueingHoneyBadger<T, N, Q>> impl<T, N, Q> SenderQueue<QueueingHoneyBadger<T, N, Q>>
where where

View File

@ -6,13 +6,13 @@ use super::{Error, MessageContent, Result};
use binary_agreement; use binary_agreement;
use broadcast::{self, Broadcast}; use broadcast::{self, Broadcast};
use rand::Rand; use rand::Rand;
use {DistAlgorithm, NetworkInfo, NodeIdT, SessionIdT}; use {NetworkInfo, NodeIdT, SessionIdT};
type BaInstance<N, S> = binary_agreement::BinaryAgreement<N, BaSessionId<S>>; type BaInstance<N, S> = binary_agreement::BinaryAgreement<N, BaSessionId<S>>;
type ValueAndStep<N, S> = (Option<Vec<u8>>, Step<N, S>); type ValueAndStep<N> = (Option<Vec<u8>>, Step<N>);
type BaResult<N, S> = binary_agreement::Result<binary_agreement::Step<N, BaSessionId<S>>>; type BaResult<N> = binary_agreement::Result<binary_agreement::Step<N>>;
pub type Step<N, S> = ::Step<ProposalState<N, S>>; pub type Step<N> = ::Step<MessageContent, Vec<u8>, N>;
/// The state of a proposal's broadcast and agreement process. /// The state of a proposal's broadcast and agreement process.
#[derive(Debug)] #[derive(Debug)]
@ -28,30 +28,6 @@ pub enum ProposalState<N: Rand, S> {
Complete(bool), Complete(bool),
} }
impl<N: NodeIdT + Rand, S: SessionIdT> DistAlgorithm for ProposalState<N, S> {
type NodeId = N;
type Input = Vec<u8>;
type Output = Vec<u8>;
type Message = MessageContent;
type Error = Error;
fn handle_input(&mut self, input: Self::Input) -> Result<Step<N, S>> {
self.propose(input)
}
fn handle_message(&mut self, sender_id: &N, message: MessageContent) -> Result<Step<N, S>> {
self.handle_message(sender_id, message)
}
fn terminated(&self) -> bool {
self.complete()
}
fn our_id(&self) -> &Self::NodeId {
unreachable!() // We don't actually need `DistAlgorithm`, just `Step`.
}
}
impl<N: NodeIdT + Rand, S: SessionIdT> ProposalState<N, S> { impl<N: NodeIdT + Rand, S: SessionIdT> ProposalState<N, S> {
/// Creates a new `ProposalState::Ongoing`, with a fresh broadcast and agreement instance. /// Creates a new `ProposalState::Ongoing`, with a fresh broadcast and agreement instance.
pub fn new(netinfo: Arc<NetworkInfo<N>>, ba_id: BaSessionId<S>, prop_id: N) -> Result<Self> { pub fn new(netinfo: Arc<NetworkInfo<N>>, ba_id: BaSessionId<S>, prop_id: N) -> Result<Self> {
@ -89,12 +65,12 @@ impl<N: NodeIdT + Rand, S: SessionIdT> ProposalState<N, S> {
} }
/// Makes a proposal by broadcasting a value. /// Makes a proposal by broadcasting a value.
pub fn propose(&mut self, value: Vec<u8>) -> Result<Step<N, S>> { pub fn propose(&mut self, value: Vec<u8>) -> Result<Step<N>> {
self.transition(|state| state.handle_broadcast(|bc| bc.broadcast(value))) self.transition(|state| state.handle_broadcast(|bc| bc.broadcast(value)))
} }
/// Handles a message received from `sender_id`. /// Handles a message received from `sender_id`.
pub fn handle_message(&mut self, sender_id: &N, msg: MessageContent) -> Result<Step<N, S>> { pub fn handle_message(&mut self, sender_id: &N, msg: MessageContent) -> Result<Step<N>> {
self.transition(|state| match msg { self.transition(|state| match msg {
MessageContent::Agreement(ba_msg) => { MessageContent::Agreement(ba_msg) => {
state.handle_agreement(|ba| ba.handle_message(sender_id, ba_msg)) state.handle_agreement(|ba| ba.handle_message(sender_id, ba_msg))
@ -106,12 +82,12 @@ impl<N: NodeIdT + Rand, S: SessionIdT> ProposalState<N, S> {
} }
/// Votes for rejecting the proposal, if still possible. /// Votes for rejecting the proposal, if still possible.
pub fn vote_false(&mut self) -> Result<Step<N, S>> { pub fn vote_false(&mut self) -> Result<Step<N>> {
self.transition(|state| state.handle_agreement(|ba| ba.propose(false))) self.transition(|state| state.handle_agreement(|ba| ba.propose(false)))
} }
/// Applies `f` to the `Broadcast` instance, and updates the state according to the outcome. /// Applies `f` to the `Broadcast` instance, and updates the state according to the outcome.
fn handle_broadcast<F>(self, f: F) -> (Self, Result<Step<N, S>>) fn handle_broadcast<F>(self, f: F) -> (Self, Result<Step<N>>)
where where
F: FnOnce(&mut Broadcast<N>) -> broadcast::Result<broadcast::Step<N>>, F: FnOnce(&mut Broadcast<N>) -> broadcast::Result<broadcast::Step<N>>,
{ {
@ -137,9 +113,9 @@ impl<N: NodeIdT + Rand, S: SessionIdT> ProposalState<N, S> {
/// Applies `f` to the `BinaryAgreement` instance, and updates the state according to the /// Applies `f` to the `BinaryAgreement` instance, and updates the state according to the
/// outcome. /// outcome.
fn handle_agreement<F>(self, f: F) -> (Self, Result<Step<N, S>>) fn handle_agreement<F>(self, f: F) -> (Self, Result<Step<N>>)
where where
F: FnOnce(&mut BaInstance<N, S>) -> BaResult<N, S>, F: FnOnce(&mut BaInstance<N, S>) -> BaResult<N>,
{ {
use self::ProposalState::*; use self::ProposalState::*;
match self { match self {
@ -160,7 +136,7 @@ impl<N: NodeIdT + Rand, S: SessionIdT> ProposalState<N, S> {
} }
/// Converts a `Broadcast` result and returns the output, if there was one. /// Converts a `Broadcast` result and returns the output, if there was one.
fn convert_bc(result: broadcast::Result<broadcast::Step<N>>) -> Result<ValueAndStep<N, S>> { fn convert_bc(result: broadcast::Result<broadcast::Step<N>>) -> Result<ValueAndStep<N>> {
let bc_step = result.map_err(Error::HandleBroadcast)?; let bc_step = result.map_err(Error::HandleBroadcast)?;
let mut step = Step::default(); let mut step = Step::default();
let opt_value = step.extend_with(bc_step, MessageContent::Broadcast).pop(); let opt_value = step.extend_with(bc_step, MessageContent::Broadcast).pop();
@ -168,7 +144,7 @@ impl<N: NodeIdT + Rand, S: SessionIdT> ProposalState<N, S> {
} }
/// Converts a `BinaryAgreement` step and returns the output, if there was one. /// Converts a `BinaryAgreement` step and returns the output, if there was one.
fn convert_ba(result: BaResult<N, S>) -> Result<(Option<bool>, Step<N, S>)> { fn convert_ba(result: BaResult<N>) -> Result<(Option<bool>, Step<N>)> {
let ba_step = result.map_err(Error::HandleAgreement)?; let ba_step = result.map_err(Error::HandleAgreement)?;
let mut step = Step::default(); let mut step = Step::default();
let opt_decision = step.extend_with(ba_step, MessageContent::Agreement).pop(); let opt_decision = step.extend_with(ba_step, MessageContent::Agreement).pop();
@ -176,9 +152,9 @@ impl<N: NodeIdT + Rand, S: SessionIdT> ProposalState<N, S> {
} }
/// Applies the given transition to `self`. /// Applies the given transition to `self`.
fn transition<F>(&mut self, f: F) -> Result<Step<N, S>> fn transition<F>(&mut self, f: F) -> Result<Step<N>>
where where
F: FnOnce(Self) -> (Self, Result<Step<N, S>>), F: FnOnce(Self) -> (Self, Result<Step<N>>),
{ {
// Temporary value: We need to take ownership of the state to make it transition. // Temporary value: We need to take ownership of the state to make it transition.
let (new_state, result) = f(mem::replace(self, ProposalState::Complete(false))); let (new_state, result) = f(mem::replace(self, ProposalState::Complete(false)));

View File

@ -12,7 +12,7 @@ use super::{Error, Message, MessageContent, Result};
use rand::Rand; use rand::Rand;
use {util, DistAlgorithm, NetworkInfo, NodeIdT, SessionIdT}; use {util, DistAlgorithm, NetworkInfo, NodeIdT, SessionIdT};
pub type Step<N, S> = ::Step<Subset<N, S>>; pub type Step<N> = ::Step<Message<N>, SubsetOutput<N>, N>;
#[derive(Derivative, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] #[derive(Derivative, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[derivative(Debug)] #[derivative(Debug)]
@ -44,11 +44,11 @@ impl<N: NodeIdT + Rand, S: SessionIdT> DistAlgorithm for Subset<N, S> {
type Message = Message<N>; type Message = Message<N>;
type Error = Error; type Error = Error;
fn handle_input(&mut self, input: Self::Input) -> Result<Step<N, S>> { fn handle_input(&mut self, input: Self::Input) -> Result<Step<N>> {
self.propose(input) self.propose(input)
} }
fn handle_message(&mut self, sender_id: &N, message: Message<N>) -> Result<Step<N, S>> { fn handle_message(&mut self, sender_id: &N, message: Message<N>) -> Result<Step<N>> {
self.handle_message(sender_id, message) self.handle_message(sender_id, message)
} }
@ -90,7 +90,7 @@ impl<N: NodeIdT + Rand, S: SessionIdT> Subset<N, S> {
/// Proposes a value for the subset. /// Proposes a value for the subset.
/// ///
/// Returns an error if we already made a proposal. /// Returns an error if we already made a proposal.
pub fn propose(&mut self, value: Vec<u8>) -> Result<Step<N, S>> { pub fn propose(&mut self, value: Vec<u8>) -> Result<Step<N>> {
if !self.netinfo.is_validator() { if !self.netinfo.is_validator() {
return Ok(Step::default()); return Ok(Step::default());
} }
@ -107,7 +107,7 @@ impl<N: NodeIdT + Rand, S: SessionIdT> Subset<N, S> {
/// Handles a message received from `sender_id`. /// Handles a message received from `sender_id`.
/// ///
/// This must be called with every message we receive from another node. /// This must be called with every message we receive from another node.
pub fn handle_message(&mut self, sender_id: &N, msg: Message<N>) -> Result<Step<N, S>> { pub fn handle_message(&mut self, sender_id: &N, msg: Message<N>) -> Result<Step<N>> {
let prop_step = self let prop_step = self
.proposal_states .proposal_states
.get_mut(&msg.proposer_id) .get_mut(&msg.proposer_id)
@ -123,7 +123,7 @@ impl<N: NodeIdT + Rand, S: SessionIdT> Subset<N, S> {
self.proposal_states.values().filter(received).count() self.proposal_states.values().filter(received).count()
} }
fn convert_step(proposer_id: &N, prop_step: ProposalStep<N, S>) -> Step<N, S> { fn convert_step(proposer_id: &N, prop_step: ProposalStep<N>) -> Step<N> {
let from_p_msg = |p_msg: MessageContent| p_msg.with(proposer_id.clone()); let from_p_msg = |p_msg: MessageContent| p_msg.with(proposer_id.clone());
let mut step = Step::default(); let mut step = Step::default();
if let Some(value) = step.extend_with(prop_step, from_p_msg).pop() { if let Some(value) = step.extend_with(prop_step, from_p_msg).pop() {
@ -141,7 +141,7 @@ impl<N: NodeIdT + Rand, S: SessionIdT> Subset<N, S> {
/// Checks the voting and termination conditions: If enough proposals have been accepted, votes /// Checks the voting and termination conditions: If enough proposals have been accepted, votes
/// "no" for the remaining ones. If all proposals have been decided, outputs `Done`. /// "no" for the remaining ones. If all proposals have been decided, outputs `Done`.
fn try_output(&mut self) -> Result<Step<N, S>> { fn try_output(&mut self) -> Result<Step<N>> {
if self.decided || self.count_accepted() < self.netinfo.num_correct() { if self.decided || self.count_accepted() < self.netinfo.num_correct() {
return Ok(Step::default()); return Ok(Step::default());
} }

View File

@ -80,7 +80,7 @@ pub struct ThresholdDecrypt<N> {
terminated: bool, terminated: bool,
} }
pub type Step<N> = ::Step<ThresholdDecrypt<N>>; pub type Step<N> = ::DaStep<ThresholdDecrypt<N>>;
impl<N: NodeIdT> DistAlgorithm for ThresholdDecrypt<N> { impl<N: NodeIdT> DistAlgorithm for ThresholdDecrypt<N> {
type NodeId = N; type NodeId = N;

View File

@ -75,7 +75,7 @@ pub struct ThresholdSign<N> {
terminated: bool, terminated: bool,
} }
pub type Step<N> = ::Step<ThresholdSign<N>>; pub type Step<N> = ::DaStep<ThresholdSign<N>>;
impl<N: NodeIdT> DistAlgorithm for ThresholdSign<N> { impl<N: NodeIdT> DistAlgorithm for ThresholdSign<N> {
type NodeId = N; type NodeId = N;

View File

@ -57,22 +57,14 @@ impl<E> EpochT for E where E: Copy + Message + Default + Eq + Ord + Serialize +
/// catch it, instead of potentially stalling the algorithm. /// catch it, instead of potentially stalling the algorithm.
#[must_use = "The algorithm step result must be used."] #[must_use = "The algorithm step result must be used."]
#[derive(Debug)] #[derive(Debug)]
pub struct Step<D> pub struct Step<M, O, N> {
where pub output: Vec<O>,
D: DistAlgorithm, pub fault_log: FaultLog<N>,
<D as DistAlgorithm>::NodeId: NodeIdT, pub messages: Vec<TargetedMessage<M, N>>,
{
pub output: Vec<D::Output>,
pub fault_log: FaultLog<D::NodeId>,
pub messages: Vec<TargetedMessage<D::Message, D::NodeId>>,
} }
impl<D> Default for Step<D> impl<M, O, N> Default for Step<M, O, N> {
where fn default() -> Self {
D: DistAlgorithm,
<D as DistAlgorithm>::NodeId: NodeIdT,
{
fn default() -> Step<D> {
Step { Step {
output: Vec::default(), output: Vec::default(),
fault_log: FaultLog::default(), fault_log: FaultLog::default(),
@ -81,15 +73,12 @@ where
} }
} }
impl<D: DistAlgorithm> Step<D> impl<M, O, N> Step<M, O, N> {
where
<D as DistAlgorithm>::NodeId: NodeIdT,
{
/// Creates a new `Step` from the given collections. /// Creates a new `Step` from the given collections.
pub fn new( pub fn new(
output: Vec<D::Output>, output: Vec<O>,
fault_log: FaultLog<D::NodeId>, fault_log: FaultLog<N>,
messages: Vec<TargetedMessage<D::Message, D::NodeId>>, messages: Vec<TargetedMessage<M, N>>,
) -> Self { ) -> Self {
Step { Step {
output, output,
@ -99,18 +88,17 @@ where
} }
/// Returns the same step, with the given additional output. /// Returns the same step, with the given additional output.
pub fn with_output<T: Into<Option<D::Output>>>(mut self, output: T) -> Self { pub fn with_output<T: Into<Option<O>>>(mut self, output: T) -> Self {
self.output.extend(output.into()); self.output.extend(output.into());
self self
} }
/// Converts `self` into a step of another type, given conversion methods for output and /// Converts `self` into a step of another type, given conversion methods for output and
/// messages. /// messages.
pub fn map<D2, FO, FM>(self, f_out: FO, f_msg: FM) -> Step<D2> pub fn map<M2, O2, FO, FM>(self, f_out: FO, f_msg: FM) -> Step<M2, O2, N>
where where
D2: DistAlgorithm<NodeId = D::NodeId>, FO: Fn(O) -> O2,
FO: Fn(D::Output) -> D2::Output, FM: Fn(M) -> M2,
FM: Fn(D::Message) -> D2::Message,
{ {
Step { Step {
output: self.output.into_iter().map(f_out).collect(), output: self.output.into_iter().map(f_out).collect(),
@ -120,10 +108,9 @@ where
} }
/// Extends `self` with `other`s messages and fault logs, and returns `other.output`. /// Extends `self` with `other`s messages and fault logs, and returns `other.output`.
pub fn extend_with<D2, FM>(&mut self, other: Step<D2>, f_msg: FM) -> Vec<D2::Output> pub fn extend_with<M2, O2, FM>(&mut self, other: Step<M2, O2, N>, f_msg: FM) -> Vec<O2>
where where
D2: DistAlgorithm<NodeId = D::NodeId>, FM: Fn(M2) -> M,
FM: Fn(D2::Message) -> D::Message,
{ {
self.fault_log.extend(other.fault_log); self.fault_log.extend(other.fault_log);
let msgs = other.messages.into_iter().map(|tm| tm.map(&f_msg)); let msgs = other.messages.into_iter().map(|tm| tm.map(&f_msg));
@ -144,27 +131,14 @@ where
self self
} }
/// Converts this step into an equivalent step for a different `DistAlgorithm`.
// This cannot be a `From` impl, because it would conflict with `impl From<T> for T`.
pub fn convert<D2>(self) -> Step<D2>
where
D2: DistAlgorithm<NodeId = D::NodeId, Output = D::Output, Message = D::Message>,
{
Step {
output: self.output,
fault_log: self.fault_log,
messages: self.messages,
}
}
/// Returns `true` if there are no messages, faults or outputs. /// Returns `true` if there are no messages, faults or outputs.
pub fn is_empty(&self) -> bool { pub fn is_empty(&self) -> bool {
self.output.is_empty() && self.fault_log.is_empty() && self.messages.is_empty() self.output.is_empty() && self.fault_log.is_empty() && self.messages.is_empty()
} }
} }
impl<D: DistAlgorithm> From<FaultLog<D::NodeId>> for Step<D> { impl<M, O, N> From<FaultLog<N>> for Step<M, O, N> {
fn from(fault_log: FaultLog<D::NodeId>) -> Self { fn from(fault_log: FaultLog<N>) -> Self {
Step { Step {
fault_log, fault_log,
..Step::default() ..Step::default()
@ -172,8 +146,8 @@ impl<D: DistAlgorithm> From<FaultLog<D::NodeId>> for Step<D> {
} }
} }
impl<D: DistAlgorithm> From<Fault<D::NodeId>> for Step<D> { impl<M, O, N> From<Fault<N>> for Step<M, O, N> {
fn from(fault: Fault<D::NodeId>) -> Self { fn from(fault: Fault<N>) -> Self {
Step { Step {
fault_log: fault.into(), fault_log: fault.into(),
..Step::default() ..Step::default()
@ -181,8 +155,8 @@ impl<D: DistAlgorithm> From<Fault<D::NodeId>> for Step<D> {
} }
} }
impl<D: DistAlgorithm> From<TargetedMessage<D::Message, D::NodeId>> for Step<D> { impl<M, O, N> From<TargetedMessage<M, N>> for Step<M, O, N> {
fn from(msg: TargetedMessage<D::Message, D::NodeId>) -> Self { fn from(msg: TargetedMessage<M, N>) -> Self {
Step { Step {
messages: once(msg).collect(), messages: once(msg).collect(),
..Step::default() ..Step::default()
@ -190,10 +164,9 @@ impl<D: DistAlgorithm> From<TargetedMessage<D::Message, D::NodeId>> for Step<D>
} }
} }
impl<D, I> From<I> for Step<D> impl<I, M, O, N> From<I> for Step<M, O, N>
where where
D: DistAlgorithm, I: IntoIterator<Item = TargetedMessage<M, N>>,
I: IntoIterator<Item = TargetedMessage<D::Message, D::NodeId>>,
{ {
fn from(msgs: I) -> Self { fn from(msgs: I) -> Self {
Step { Step {
@ -233,26 +206,28 @@ impl<M: Epoched, N> Epoched for TargetedMessage<M, N> {
} }
} }
impl<'i, D> Step<D> /// An alias for the type of `Step` returned by `D`'s methods.
pub type DaStep<D> =
Step<<D as DistAlgorithm>::Message, <D as DistAlgorithm>::Output, <D as DistAlgorithm>::NodeId>;
impl<'i, M, O, N> Step<M, O, N>
where where
D: DistAlgorithm, N: NodeIdT + Rand,
<D as DistAlgorithm>::NodeId: NodeIdT + Rand, M: 'i + Clone + SenderQueueableMessage + Serialize + DeserializeOwned,
<D as DistAlgorithm>::Message:
'i + Clone + SenderQueueableMessage + Serialize + DeserializeOwned,
{ {
/// Removes and returns any messages that are not yet accepted by remote nodes according to the /// Removes and returns any messages that are not yet accepted by remote nodes according to the
/// mapping `remote_epochs`. This way the returned messages are postponed until later, and the /// mapping `remote_epochs`. This way the returned messages are postponed until later, and the
/// remaining messages can be sent to remote nodes without delay. /// remaining messages can be sent to remote nodes without delay.
pub fn defer_messages( pub fn defer_messages(
&mut self, &mut self,
peer_epochs: &'i BTreeMap<D::NodeId, <D::Message as Epoched>::LinEpoch>, peer_epochs: &'i BTreeMap<N, <M as Epoched>::LinEpoch>,
max_future_epochs: u64, max_future_epochs: u64,
) -> Vec<(D::NodeId, D::Message)> ) -> Vec<(N, M)>
where where
<D as DistAlgorithm>::NodeId: 'i, N: 'i,
{ {
let messages = &mut self.messages; let messages = &mut self.messages;
let mut deferred_msgs: Vec<(D::NodeId, D::Message)> = Vec::new(); let mut deferred_msgs: Vec<(N, M)> = Vec::new();
let mut passed_msgs: Vec<_> = Vec::new(); let mut passed_msgs: Vec<_> = Vec::new();
for msg in messages.drain(..) { for msg in messages.drain(..) {
match msg.target.clone() { match msg.target.clone() {
@ -306,7 +281,7 @@ pub trait DistAlgorithm: Send + Sync {
type Error: Fail; type Error: Fail;
/// Handles an input provided by the user, and returns /// Handles an input provided by the user, and returns
fn handle_input(&mut self, input: Self::Input) -> Result<Step<Self>, Self::Error> fn handle_input(&mut self, input: Self::Input) -> Result<DaStep<Self>, Self::Error>
where where
Self: Sized; Self: Sized;
@ -315,7 +290,7 @@ pub trait DistAlgorithm: Send + Sync {
&mut self, &mut self,
sender_id: &Self::NodeId, sender_id: &Self::NodeId,
message: Self::Message, message: Self::Message,
) -> Result<Step<Self>, Self::Error> ) -> Result<DaStep<Self>, Self::Error>
where where
Self: Sized; Self: Sized;

View File

@ -16,7 +16,7 @@ use std::sync::{Arc, Mutex};
use hbbft::binary_agreement::{BinaryAgreement, MessageContent, SbvMessage}; use hbbft::binary_agreement::{BinaryAgreement, MessageContent, SbvMessage};
use hbbft::threshold_sign::ThresholdSign; use hbbft::threshold_sign::ThresholdSign;
use hbbft::{DistAlgorithm, NetworkInfo, Step}; use hbbft::{DaStep, DistAlgorithm, NetworkInfo};
use net::adversary::{NetMutHandle, QueuePosition}; use net::adversary::{NetMutHandle, QueuePosition};
use net::err::CrankError; use net::err::CrankError;
@ -415,7 +415,7 @@ impl Adversary<Algo> for AbaCommonCoinAdversary {
&mut self, &mut self,
_: NetMutHandle<Algo>, _: NetMutHandle<Algo>,
msg: NetMessage<Algo>, msg: NetMessage<Algo>,
) -> Result<Step<Algo>, CrankError<Algo>> { ) -> Result<DaStep<Algo>, CrankError<Algo>> {
if let MessageContent::Coin(ref coin_msg) = msg.payload().content { if let MessageContent::Coin(ref coin_msg) = msg.payload().content {
let mut new_coin_state = None; let mut new_coin_state = None;
if let CoinState::InProgress(ref mut coin) = self.coin_state { if let CoinState::InProgress(ref mut coin) = self.coin_state {
@ -430,7 +430,7 @@ impl Adversary<Algo> for AbaCommonCoinAdversary {
self.coin_state = new_coin_state; self.coin_state = new_coin_state;
} }
} }
Ok(Step::default()) Ok(DaStep::<Algo>::default())
} }
} }

View File

@ -38,7 +38,7 @@ use std::{cmp, fmt};
use rand::Rng; use rand::Rng;
use hbbft::{DistAlgorithm, Step}; use hbbft::{DaStep, DistAlgorithm};
use net::{CrankError, NetMessage, Node, VirtualNet}; use net::{CrankError, NetMessage, Node, VirtualNet};
@ -143,7 +143,7 @@ where
} }
/// Normally dispatch a message /// Normally dispatch a message
pub fn dispatch_message(&mut self, msg: NetMessage<D>) -> Result<Step<D>, CrankError<D>> { pub fn dispatch_message(&mut self, msg: NetMessage<D>) -> Result<DaStep<D>, CrankError<D>> {
self.0.dispatch_message(msg) self.0.dispatch_message(msg)
} }
@ -336,7 +336,7 @@ where
&mut self, &mut self,
mut net: NetMutHandle<D>, mut net: NetMutHandle<D>,
msg: NetMessage<D>, msg: NetMessage<D>,
) -> Result<Step<D>, CrankError<D>> { ) -> Result<DaStep<D>, CrankError<D>> {
net.dispatch_message(msg) net.dispatch_message(msg)
} }
} }

View File

@ -26,7 +26,7 @@ use threshold_crypto as crypto;
use hbbft::dynamic_honey_badger::Batch; use hbbft::dynamic_honey_badger::Batch;
use hbbft::util::SubRng; use hbbft::util::SubRng;
use hbbft::{self, Contribution, DistAlgorithm, NetworkInfo, NodeIdT, Step}; use hbbft::{self, Contribution, DaStep, DistAlgorithm, NetworkInfo, NodeIdT, Step};
use try_some; use try_some;
@ -197,7 +197,7 @@ pub type NetMessage<D> =
fn process_step<'a, D>( fn process_step<'a, D>(
nodes: &'a mut collections::BTreeMap<D::NodeId, Node<D>>, nodes: &'a mut collections::BTreeMap<D::NodeId, Node<D>>,
sender: D::NodeId, sender: D::NodeId,
step: &Step<D>, step: &DaStep<D>,
dest: &mut collections::VecDeque<NetMessage<D>>, dest: &mut collections::VecDeque<NetMessage<D>>,
) -> usize ) -> usize
where where
@ -307,7 +307,7 @@ where
/// Number of faulty nodes in the network. /// Number of faulty nodes in the network.
num_faulty: usize, num_faulty: usize,
/// Dist-algorithm constructor function. /// Dist-algorithm constructor function.
cons: Option<Box<Fn(NewNodeInfo<D>) -> (D, Step<D>)>>, cons: Option<Box<Fn(NewNodeInfo<D>) -> (D, DaStep<D>)>>,
/// Network adversary. /// Network adversary.
adversary: Option<Box<dyn Adversary<D>>>, adversary: Option<Box<dyn Adversary<D>>>,
/// Trace-enabling flag. `None` means use environment. /// Trace-enabling flag. `None` means use environment.
@ -463,7 +463,7 @@ where
#[inline] #[inline]
pub fn using_step<F>(mut self, cons: F) -> Self pub fn using_step<F>(mut self, cons: F) -> Self
where where
F: Fn(NewNodeInfo<D>) -> (D, Step<D>) + 'static, F: Fn(NewNodeInfo<D>) -> (D, DaStep<D>) + 'static,
{ {
self.cons = Some(Box::new(cons)); self.cons = Some(Box::new(cons));
self self
@ -729,7 +729,7 @@ where
cons: F, cons: F,
) -> Result<Self, crypto::error::Error> ) -> Result<Self, crypto::error::Error>
where where
F: Fn(NewNodeInfo<D>) -> (D, Step<D>), F: Fn(NewNodeInfo<D>) -> (D, DaStep<D>),
I: IntoIterator<Item = D::NodeId>, I: IntoIterator<Item = D::NodeId>,
R: rand::Rng, R: rand::Rng,
{ {
@ -789,7 +789,7 @@ where
/// ///
/// Retrieves the receiving node for a `msg` and hands over the payload. /// Retrieves the receiving node for a `msg` and hands over the payload.
#[inline] #[inline]
pub fn dispatch_message(&mut self, msg: NetMessage<D>) -> Result<Step<D>, CrankError<D>> { pub fn dispatch_message(&mut self, msg: NetMessage<D>) -> Result<DaStep<D>, CrankError<D>> {
let node = self let node = self
.nodes .nodes
.get_mut(&msg.to) .get_mut(&msg.to)
@ -816,7 +816,7 @@ where
/// ///
/// Panics if `id` does not name a valid node. /// Panics if `id` does not name a valid node.
#[inline] #[inline]
pub fn send_input(&mut self, id: D::NodeId, input: D::Input) -> Result<Step<D>, D::Error> { pub fn send_input(&mut self, id: D::NodeId, input: D::Input) -> Result<DaStep<D>, D::Error> {
let step = self let step = self
.nodes .nodes
.get_mut(&id) .get_mut(&id)
@ -842,7 +842,7 @@ where
/// If a successful `Step` was generated, all of its messages are queued on the network and the /// If a successful `Step` was generated, all of its messages are queued on the network and the
/// `Step` is returned. /// `Step` is returned.
#[inline] #[inline]
pub fn crank(&mut self) -> Option<Result<(D::NodeId, Step<D>), CrankError<D>>> { pub fn crank(&mut self) -> Option<Result<(D::NodeId, DaStep<D>), CrankError<D>>> {
// Check limits. // Check limits.
if let Some(limit) = self.crank_limit { if let Some(limit) = self.crank_limit {
if self.crank_count >= limit { if self.crank_count >= limit {
@ -893,7 +893,7 @@ where
.ok_or_else(|| CrankError::NodeDisappeared(msg.to.clone())) .ok_or_else(|| CrankError::NodeDisappeared(msg.to.clone()))
).is_faulty(); ).is_faulty();
let step: Step<_> = if is_faulty { let step: Step<_, _, _> = if is_faulty {
// The swap-dance is painful here, as we are creating an `opt_step` just to avoid // The swap-dance is painful here, as we are creating an `opt_step` just to avoid
// borrow issues. // borrow issues.
let mut adv = self.adversary.take(); let mut adv = self.adversary.take();
@ -932,7 +932,7 @@ where
/// ///
/// Shortcut for cranking the network, expecting both progress to be made as well as processing /// Shortcut for cranking the network, expecting both progress to be made as well as processing
/// to proceed. /// to proceed.
pub fn crank_expect(&mut self) -> (D::NodeId, Step<D>) { pub fn crank_expect(&mut self) -> (D::NodeId, DaStep<D>) {
self.crank() self.crank()
.expect("crank: network queue empty") .expect("crank: network queue empty")
.expect("crank: node failed to process step") .expect("crank: node failed to process step")
@ -956,7 +956,7 @@ where
pub fn broadcast_input<'a>( pub fn broadcast_input<'a>(
&'a mut self, &'a mut self,
input: &'a D::Input, input: &'a D::Input,
) -> Result<Vec<(D::NodeId, Step<D>)>, D::Error> { ) -> Result<Vec<(D::NodeId, DaStep<D>)>, D::Error> {
// Note: The tricky lifetime annotation basically says that the input value given must // Note: The tricky lifetime annotation basically says that the input value given must
// live as long as the iterator returned lives (because it is cloned on every step, // live as long as the iterator returned lives (because it is cloned on every step,
// with steps only evaluated each time `next()` is called. For the same reason the // with steps only evaluated each time `next()` is called. For the same reason the
@ -1050,7 +1050,7 @@ where
D::Message: Clone, D::Message: Clone,
D::Output: Clone, D::Output: Clone,
{ {
type Item = Result<(D::NodeId, Step<D>), CrankError<D>>; type Item = Result<(D::NodeId, DaStep<D>), CrankError<D>>;
#[inline] #[inline]
fn next(&mut self) -> Option<Self::Item> { fn next(&mut self) -> Option<Self::Item> {

View File

@ -10,7 +10,9 @@ use rand_derive::Rand;
use serde_derive::{Deserialize, Serialize}; use serde_derive::{Deserialize, Serialize};
use hbbft::dynamic_honey_badger::Batch; use hbbft::dynamic_honey_badger::Batch;
use hbbft::{Contribution, DistAlgorithm, Fault, NetworkInfo, Step, Target, TargetedMessage}; use hbbft::{
Contribution, DaStep, DistAlgorithm, Fault, NetworkInfo, Step, Target, TargetedMessage,
};
/// A node identifier. In the tests, nodes are simply numbered. /// A node identifier. In the tests, nodes are simply numbered.
#[derive(Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Clone, Copy, Serialize, Deserialize, Rand)] #[derive(Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Clone, Copy, Serialize, Deserialize, Rand)]
@ -59,7 +61,7 @@ impl<D: DistAlgorithm> TestNode<D> {
} }
/// Creates a new test node with the given broadcast instance. /// Creates a new test node with the given broadcast instance.
fn new((algo, step): (D, Step<D>)) -> TestNode<D> { fn new((algo, step): (D, DaStep<D>)) -> TestNode<D> {
TestNode { TestNode {
id: algo.our_id().clone(), id: algo.our_id().clone(),
algo, algo,
@ -403,7 +405,7 @@ where
new_algo: F, new_algo: F,
) -> TestNetwork<A, D> ) -> TestNetwork<A, D>
where where
F: Fn(Arc<NetworkInfo<NodeId>>) -> (D, Step<D>), F: Fn(Arc<NetworkInfo<NodeId>>) -> (D, DaStep<D>),
G: Fn(BTreeMap<D::NodeId, Arc<NetworkInfo<D::NodeId>>>) -> A, G: Fn(BTreeMap<D::NodeId, Arc<NetworkInfo<D::NodeId>>>) -> A,
{ {
let mut rng = rand::thread_rng(); let mut rng = rand::thread_rng();