mirror of https://github.com/poanetwork/hbbft.git
added step output in DistAlgorithm
This commit is contained in:
parent
3bf32832bc
commit
0ba06fdb76
|
@ -76,7 +76,7 @@ use agreement::bin_values::BinValues;
|
|||
use common_coin;
|
||||
use common_coin::{CommonCoin, CommonCoinMessage};
|
||||
use fault_log::FaultLog;
|
||||
use messaging::{DistAlgorithm, NetworkInfo, Target, TargetedMessage};
|
||||
use messaging::{DistAlgorithm, NetworkInfo, Step, Target, TargetedMessage};
|
||||
|
||||
error_chain!{
|
||||
links {
|
||||
|
@ -189,6 +189,8 @@ pub struct Agreement<NodeUid> {
|
|||
coin_schedule: CoinSchedule,
|
||||
}
|
||||
|
||||
pub type AgreementStep<N> = Step<N, bool>;
|
||||
|
||||
impl<NodeUid: Clone + Debug + Ord> DistAlgorithm for Agreement<NodeUid> {
|
||||
type NodeUid = NodeUid;
|
||||
type Input = bool;
|
||||
|
@ -196,8 +198,9 @@ impl<NodeUid: Clone + Debug + Ord> DistAlgorithm for Agreement<NodeUid> {
|
|||
type Message = AgreementMessage;
|
||||
type Error = Error;
|
||||
|
||||
fn input(&mut self, input: Self::Input) -> AgreementResult<FaultLog<NodeUid>> {
|
||||
self.set_input(input)
|
||||
fn input(&mut self, input: Self::Input) -> AgreementResult<AgreementStep<NodeUid>> {
|
||||
let fault_log = self.set_input(input);
|
||||
self.step().with_fault_log(fault_log)
|
||||
}
|
||||
|
||||
/// Receive input from a remote node.
|
||||
|
@ -205,23 +208,24 @@ impl<NodeUid: Clone + Debug + Ord> DistAlgorithm for Agreement<NodeUid> {
|
|||
&mut self,
|
||||
sender_id: &Self::NodeUid,
|
||||
message: Self::Message,
|
||||
) -> AgreementResult<FaultLog<NodeUid>> {
|
||||
) -> AgreementResult<AgreementStep<NodeUid>> {
|
||||
if self.terminated || message.epoch < self.epoch {
|
||||
// Message is obsolete; we're already in a later epoch or terminated.
|
||||
return Ok(FaultLog::new());
|
||||
// Message is obsolete: We are already in a later epoch or terminated.
|
||||
return self.step();
|
||||
}
|
||||
if message.epoch > self.epoch {
|
||||
// Message is for a later epoch. We can't handle that yet.
|
||||
self.incoming_queue.push((sender_id.clone(), message));
|
||||
return Ok(FaultLog::new());
|
||||
}
|
||||
match message.content {
|
||||
AgreementContent::BVal(b) => self.handle_bval(sender_id, b),
|
||||
AgreementContent::Aux(b) => self.handle_aux(sender_id, b),
|
||||
AgreementContent::Conf(v) => self.handle_conf(sender_id, v),
|
||||
AgreementContent::Term(v) => self.handle_term(sender_id, v).map(|()| FaultLog::new()),
|
||||
AgreementContent::Coin(msg) => self.handle_coin(sender_id, *msg),
|
||||
return self.step();
|
||||
}
|
||||
let fault_log = match message.content {
|
||||
AgreementContent::BVal(b) => self.handle_bval(sender_id, b)?,
|
||||
AgreementContent::Aux(b) => self.handle_aux(sender_id, b)?,
|
||||
AgreementContent::Conf(v) => self.handle_conf(sender_id, v)?,
|
||||
AgreementContent::Term(v) => self.handle_term(sender_id, v)?.map(|()| FaultLog::new()),
|
||||
AgreementContent::Coin(msg) => self.handle_coin(sender_id, *msg)?,
|
||||
};
|
||||
self.step().with_fault_log(fault_log)
|
||||
}
|
||||
|
||||
/// Take the next Agreement message for multicast to all other nodes.
|
||||
|
@ -231,11 +235,6 @@ impl<NodeUid: Clone + Debug + Ord> DistAlgorithm for Agreement<NodeUid> {
|
|||
.map(|msg| Target::All.message(msg))
|
||||
}
|
||||
|
||||
/// Consume the output. Once consumed, the output stays `None` forever.
|
||||
fn next_output(&mut self) -> Option<Self::Output> {
|
||||
self.output.take()
|
||||
}
|
||||
|
||||
/// Whether the algorithm has terminated.
|
||||
fn terminated(&self) -> bool {
|
||||
self.terminated
|
||||
|
@ -283,8 +282,12 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
|
|||
}
|
||||
}
|
||||
|
||||
fn step(&mut self) -> AgreementResult<AgreementStep<NodeUid>> {
|
||||
Ok(Step::new(replace(&mut self.output, None)))
|
||||
}
|
||||
|
||||
/// Sets the input value for agreement.
|
||||
pub fn set_input(&mut self, input: bool) -> AgreementResult<FaultLog<NodeUid>> {
|
||||
fn set_input(&mut self, input: bool) -> AgreementResult<FaultLog<NodeUid>> {
|
||||
if self.epoch != 0 || self.estimated.is_some() {
|
||||
return Err(ErrorKind::InputNotAccepted.into());
|
||||
}
|
||||
|
@ -297,7 +300,7 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
|
|||
// Set the initial estimated value to the input value.
|
||||
self.estimated = Some(input);
|
||||
// Record the input value as sent.
|
||||
self.send_bval(input)
|
||||
self.send_bval(input);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -468,16 +471,16 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
|
|||
sender_id: &NodeUid,
|
||||
msg: CommonCoinMessage,
|
||||
) -> AgreementResult<FaultLog<NodeUid>> {
|
||||
let mut fault_log = self.common_coin.handle_message(sender_id, msg)?;
|
||||
let mut coin_step = self.common_coin.handle_message(sender_id, msg)?;
|
||||
self.extend_common_coin();
|
||||
|
||||
if let Some(coin) = self.common_coin.next_output() {
|
||||
if let Some(coin) = coin_step.output {
|
||||
let def_bin_value = self.count_conf().1.definite();
|
||||
self.on_coin(coin, def_bin_value)?
|
||||
.merge_into(&mut fault_log);
|
||||
.merge_into(&mut coin_step.fault_log);
|
||||
}
|
||||
|
||||
Ok(fault_log)
|
||||
Ok(coin_step.fault_log)
|
||||
}
|
||||
|
||||
/// When the common coin has been computed, tries to decide on an output value, updates the
|
||||
|
@ -503,8 +506,13 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
|
|||
let mut fault_log = self.send_bval(b)?;
|
||||
let queued_msgs = replace(&mut self.incoming_queue, Vec::new());
|
||||
for (sender_id, msg) in queued_msgs {
|
||||
self.handle_message(&sender_id, msg)?
|
||||
.merge_into(&mut fault_log);
|
||||
let step = self.handle_message(&sender_id, msg)?;
|
||||
fault_log.extend(step.fault_log);
|
||||
// Save the output of the internal call.
|
||||
self.output = step.output;
|
||||
if self.terminated {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Ok(fault_log)
|
||||
}
|
||||
|
|
|
@ -44,6 +44,7 @@
|
|||
use std::collections::{BTreeMap, VecDeque};
|
||||
use std::fmt::{self, Debug};
|
||||
use std::iter::once;
|
||||
use std::mem::replace;
|
||||
use std::sync::Arc;
|
||||
|
||||
use byteorder::{BigEndian, ByteOrder};
|
||||
|
@ -54,7 +55,7 @@ use ring::digest;
|
|||
|
||||
use fault_log::{FaultKind, FaultLog};
|
||||
use fmt::{HexBytes, HexList, HexProof};
|
||||
use messaging::{DistAlgorithm, NetworkInfo, Target, TargetedMessage};
|
||||
use messaging::{DistAlgorithm, NetworkInfo, Step, Target, TargetedMessage};
|
||||
|
||||
error_chain!{
|
||||
types {
|
||||
|
@ -118,6 +119,8 @@ pub struct Broadcast<NodeUid> {
|
|||
output: Option<Vec<u8>>,
|
||||
}
|
||||
|
||||
pub type BroadcastStep<N> = Step<N, Vec<u8>>;
|
||||
|
||||
impl<NodeUid: Debug + Clone + Ord> DistAlgorithm for Broadcast<NodeUid> {
|
||||
type NodeUid = NodeUid;
|
||||
// TODO: Allow anything serializable and deserializable, i.e. make this a type parameter
|
||||
|
@ -127,7 +130,7 @@ impl<NodeUid: Debug + Clone + Ord> DistAlgorithm for Broadcast<NodeUid> {
|
|||
type Message = BroadcastMessage;
|
||||
type Error = Error;
|
||||
|
||||
fn input(&mut self, input: Self::Input) -> BroadcastResult<FaultLog<NodeUid>> {
|
||||
fn input(&mut self, input: Self::Input) -> BroadcastResult<BroadcastStep<NodeUid>> {
|
||||
if *self.netinfo.our_uid() != self.proposer_id {
|
||||
return Err(ErrorKind::InstanceCannotPropose.into());
|
||||
}
|
||||
|
@ -136,34 +139,32 @@ impl<NodeUid: Debug + Clone + Ord> DistAlgorithm for Broadcast<NodeUid> {
|
|||
// from this tree and send them, each to its own node.
|
||||
let proof = self.send_shards(input)?;
|
||||
let our_uid = &self.netinfo.our_uid().clone();
|
||||
self.handle_value(our_uid, proof)
|
||||
let fault_log = self.handle_value(our_uid, proof)?;
|
||||
self.step().with_fault_log(fault_log)
|
||||
}
|
||||
|
||||
fn handle_message(
|
||||
&mut self,
|
||||
sender_id: &NodeUid,
|
||||
message: Self::Message,
|
||||
) -> BroadcastResult<FaultLog<NodeUid>> {
|
||||
) -> BroadcastResult<BroadcastStep<NodeUid>> {
|
||||
if !self.netinfo.all_uids().contains(sender_id) {
|
||||
return Err(ErrorKind::UnknownSender.into());
|
||||
}
|
||||
match message {
|
||||
BroadcastMessage::Value(p) => self.handle_value(sender_id, p),
|
||||
BroadcastMessage::Echo(p) => self.handle_echo(sender_id, p),
|
||||
BroadcastMessage::Ready(ref hash) => {
|
||||
self.handle_ready(sender_id, hash).map(|()| FaultLog::new())
|
||||
}
|
||||
}
|
||||
let fault_log = match message {
|
||||
BroadcastMessage::Value(p) => self.handle_value(sender_id, p)?,
|
||||
BroadcastMessage::Echo(p) => self.handle_echo(sender_id, p)?,
|
||||
BroadcastMessage::Ready(ref hash) => self
|
||||
.handle_ready(sender_id, hash)?
|
||||
.map(|()| FaultLog::new()),
|
||||
};
|
||||
self.step().with_fault_log(fault_log)
|
||||
}
|
||||
|
||||
fn next_message(&mut self) -> Option<TargetedMessage<Self::Message, NodeUid>> {
|
||||
self.messages.pop_front()
|
||||
}
|
||||
|
||||
fn next_output(&mut self) -> Option<Self::Output> {
|
||||
self.output.take()
|
||||
}
|
||||
|
||||
fn terminated(&self) -> bool {
|
||||
self.decided
|
||||
}
|
||||
|
@ -196,6 +197,10 @@ impl<NodeUid: Debug + Clone + Ord> Broadcast<NodeUid> {
|
|||
})
|
||||
}
|
||||
|
||||
fn step(&mut self) -> BroadcastResult<BroadcastStep<NodeUid>> {
|
||||
Ok(Step::new(replace(&mut self.output, None)))
|
||||
}
|
||||
|
||||
/// Breaks the input value into shards of equal length and encodes them --
|
||||
/// and some extra parity shards -- with a Reed-Solomon erasure coding
|
||||
/// scheme. The returned value contains the shard assigned to this
|
||||
|
|
|
@ -23,12 +23,13 @@
|
|||
|
||||
use std::collections::{BTreeMap, VecDeque};
|
||||
use std::fmt::Debug;
|
||||
use std::mem::replace;
|
||||
use std::sync::Arc;
|
||||
|
||||
use crypto::error as cerror;
|
||||
use crypto::Signature;
|
||||
use fault_log::{FaultKind, FaultLog};
|
||||
use messaging::{DistAlgorithm, NetworkInfo, Target, TargetedMessage};
|
||||
use messaging::{DistAlgorithm, NetworkInfo, Step, Target, TargetedMessage};
|
||||
|
||||
error_chain! {
|
||||
links {
|
||||
|
@ -78,6 +79,8 @@ pub struct CommonCoin<NodeUid, T> {
|
|||
terminated: bool,
|
||||
}
|
||||
|
||||
pub type CommonCoinStep<N> = Step<N, bool>;
|
||||
|
||||
impl<NodeUid, T> DistAlgorithm for CommonCoin<NodeUid, T>
|
||||
where
|
||||
NodeUid: Clone + Debug + Ord,
|
||||
|
@ -90,13 +93,14 @@ where
|
|||
type Error = Error;
|
||||
|
||||
/// Sends our threshold signature share if not yet sent.
|
||||
fn input(&mut self, _input: Self::Input) -> Result<FaultLog<NodeUid>> {
|
||||
if !self.had_input {
|
||||
fn input(&mut self, _input: Self::Input) -> Result<CommonCoinStep<NodeUid>> {
|
||||
let fault_log = if !self.had_input {
|
||||
self.had_input = true;
|
||||
self.get_coin()
|
||||
self.get_coin()?
|
||||
} else {
|
||||
Ok(FaultLog::new())
|
||||
}
|
||||
FaultLog::new()
|
||||
};
|
||||
self.step().with_fault_log(fault_log)
|
||||
}
|
||||
|
||||
/// Receives input from a remote node.
|
||||
|
@ -104,12 +108,14 @@ where
|
|||
&mut self,
|
||||
sender_id: &Self::NodeUid,
|
||||
message: Self::Message,
|
||||
) -> Result<FaultLog<NodeUid>> {
|
||||
if self.terminated {
|
||||
return Ok(FaultLog::new());
|
||||
}
|
||||
let CommonCoinMessage(share) = message;
|
||||
self.handle_share(sender_id, share)
|
||||
) -> Result<CommonCoinStep<NodeUid>> {
|
||||
let fault_log = if !self.terminated {
|
||||
let CommonCoinMessage(share) = message;
|
||||
self.handle_share(sender_id, share)?
|
||||
} else {
|
||||
FaultLog::default()
|
||||
};
|
||||
self.step().with_fault_log(fault_log)
|
||||
}
|
||||
|
||||
/// Takes the next share of a threshold signature message for multicasting to all other nodes.
|
||||
|
@ -119,11 +125,6 @@ where
|
|||
.map(|msg| Target::All.message(msg))
|
||||
}
|
||||
|
||||
/// Consumes the output. Once consumed, the output stays `None` forever.
|
||||
fn next_output(&mut self) -> Option<Self::Output> {
|
||||
self.output.take()
|
||||
}
|
||||
|
||||
/// Whether the algorithm has terminated.
|
||||
fn terminated(&self) -> bool {
|
||||
self.terminated
|
||||
|
@ -151,6 +152,10 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
fn step(&mut self) -> Result<CommonCoinStep<NodeUid>> {
|
||||
Ok(Step::new(replace(&mut self.output, None)))
|
||||
}
|
||||
|
||||
fn get_coin(&mut self) -> Result<FaultLog<NodeUid>> {
|
||||
if !self.netinfo.is_validator() {
|
||||
self.try_output()?;
|
||||
|
|
|
@ -25,13 +25,14 @@
|
|||
|
||||
use std::collections::{BTreeMap, BTreeSet, VecDeque};
|
||||
use std::fmt::Debug;
|
||||
use std::mem::replace;
|
||||
use std::sync::Arc;
|
||||
|
||||
use agreement::{self, Agreement, AgreementMessage, AgreementResult};
|
||||
use broadcast::{self, Broadcast, BroadcastMessage, BroadcastResult};
|
||||
use agreement::{self, Agreement, AgreementMessage, AgreementStep};
|
||||
use broadcast::{self, Broadcast, BroadcastMessage, BroadcastStep};
|
||||
use fault_log::FaultLog;
|
||||
use fmt::HexBytes;
|
||||
use messaging::{DistAlgorithm, NetworkInfo, TargetedMessage};
|
||||
use messaging::{DistAlgorithm, NetworkInfo, Step, TargetedMessage};
|
||||
|
||||
error_chain!{
|
||||
types {
|
||||
|
@ -101,6 +102,8 @@ pub struct CommonSubset<NodeUid> {
|
|||
decided: bool,
|
||||
}
|
||||
|
||||
pub type CommonSubsetStep<NodeUid> = Step<NodeUid, BTreeMap<NodeUid, ProposedValue>>;
|
||||
|
||||
impl<NodeUid: Clone + Debug + Ord> DistAlgorithm for CommonSubset<NodeUid> {
|
||||
type NodeUid = NodeUid;
|
||||
type Input = ProposedValue;
|
||||
|
@ -108,34 +111,32 @@ impl<NodeUid: Clone + Debug + Ord> DistAlgorithm for CommonSubset<NodeUid> {
|
|||
type Message = Message<NodeUid>;
|
||||
type Error = Error;
|
||||
|
||||
fn input(&mut self, input: Self::Input) -> CommonSubsetResult<FaultLog<NodeUid>> {
|
||||
fn input(&mut self, input: Self::Input) -> CommonSubsetResult<CommonSubsetStep<NodeUid>> {
|
||||
debug!(
|
||||
"{:?} Proposing {:?}",
|
||||
self.netinfo.our_uid(),
|
||||
HexBytes(&input)
|
||||
);
|
||||
self.send_proposed_value(input)
|
||||
let fault_log = self.send_proposed_value(input)?;
|
||||
self.step().with_fault_log(fault_log)
|
||||
}
|
||||
|
||||
fn handle_message(
|
||||
&mut self,
|
||||
sender_id: &Self::NodeUid,
|
||||
message: Self::Message,
|
||||
) -> CommonSubsetResult<FaultLog<NodeUid>> {
|
||||
match message {
|
||||
Message::Broadcast(p_id, b_msg) => self.handle_broadcast(sender_id, &p_id, b_msg),
|
||||
Message::Agreement(p_id, a_msg) => self.handle_agreement(sender_id, &p_id, a_msg),
|
||||
}
|
||||
) -> CommonSubsetResult<CommonSubsetStep<NodeUid>> {
|
||||
let fault_log = match message {
|
||||
Message::Broadcast(p_id, b_msg) => self.handle_broadcast(sender_id, &p_id, b_msg)?,
|
||||
Message::Agreement(p_id, a_msg) => self.handle_agreement(sender_id, &p_id, a_msg)?,
|
||||
};
|
||||
self.step().with_fault_log(fault_log)
|
||||
}
|
||||
|
||||
fn next_message(&mut self) -> Option<TargetedMessage<Self::Message, Self::NodeUid>> {
|
||||
self.messages.pop_front()
|
||||
}
|
||||
|
||||
fn next_output(&mut self) -> Option<Self::Output> {
|
||||
self.output.take()
|
||||
}
|
||||
|
||||
fn terminated(&self) -> bool {
|
||||
self.messages.is_empty() && self.agreement_instances.values().all(Agreement::terminated)
|
||||
}
|
||||
|
@ -177,6 +178,10 @@ impl<NodeUid: Clone + Debug + Ord> CommonSubset<NodeUid> {
|
|||
})
|
||||
}
|
||||
|
||||
fn step(&mut self) -> CommonSubsetResult<CommonSubsetStep<NodeUid>> {
|
||||
Ok(Step::new(replace(&mut self.output, None)))
|
||||
}
|
||||
|
||||
/// Common Subset input message handler. It receives a value for broadcast
|
||||
/// and redirects it to the corresponding broadcast instance.
|
||||
pub fn send_proposed_value(
|
||||
|
@ -224,7 +229,7 @@ impl<NodeUid: Clone + Debug + Ord> CommonSubset<NodeUid> {
|
|||
f: F,
|
||||
) -> CommonSubsetResult<FaultLog<NodeUid>>
|
||||
where
|
||||
F: FnOnce(&mut Broadcast<NodeUid>) -> BroadcastResult<FaultLog<NodeUid>>,
|
||||
F: FnOnce(&mut Broadcast<NodeUid>) -> Result<BroadcastStep<NodeUid>, broadcast::Error>,
|
||||
{
|
||||
let mut fault_log = FaultLog::new();
|
||||
let value = {
|
||||
|
@ -232,9 +237,10 @@ impl<NodeUid: Clone + Debug + Ord> CommonSubset<NodeUid> {
|
|||
.broadcast_instances
|
||||
.get_mut(proposer_id)
|
||||
.ok_or(ErrorKind::NoSuchBroadcastInstance)?;
|
||||
f(broadcast)?.merge_into(&mut fault_log);
|
||||
let step = f(broadcast)?;
|
||||
fault_log.extend(step.fault_log);
|
||||
self.messages.extend_broadcast(&proposer_id, broadcast);
|
||||
if let Some(output) = broadcast.next_output() {
|
||||
if let Some(output) = step.output {
|
||||
output
|
||||
} else {
|
||||
return Ok(fault_log);
|
||||
|
@ -243,9 +249,10 @@ impl<NodeUid: Clone + Debug + Ord> CommonSubset<NodeUid> {
|
|||
self.broadcast_results.insert(proposer_id.clone(), value);
|
||||
let set_agreement_input = |agreement: &mut Agreement<NodeUid>| {
|
||||
if agreement.accepts_input() {
|
||||
agreement.set_input(true)
|
||||
// FIXME: Use the result.
|
||||
agreement.input(true)
|
||||
} else {
|
||||
Ok(FaultLog::new())
|
||||
Ok(Default::default())
|
||||
}
|
||||
};
|
||||
self.process_agreement(proposer_id, set_agreement_input)?
|
||||
|
@ -261,7 +268,7 @@ impl<NodeUid: Clone + Debug + Ord> CommonSubset<NodeUid> {
|
|||
f: F,
|
||||
) -> CommonSubsetResult<FaultLog<NodeUid>>
|
||||
where
|
||||
F: FnOnce(&mut Agreement<NodeUid>) -> AgreementResult<FaultLog<NodeUid>>,
|
||||
F: FnOnce(&mut Agreement<NodeUid>) -> Result<AgreementStep<NodeUid>, agreement::Error>,
|
||||
{
|
||||
let mut fault_log = FaultLog::new();
|
||||
let value = {
|
||||
|
@ -272,9 +279,10 @@ impl<NodeUid: Clone + Debug + Ord> CommonSubset<NodeUid> {
|
|||
if agreement.terminated() {
|
||||
return Ok(fault_log);
|
||||
}
|
||||
f(agreement)?.merge_into(&mut fault_log);
|
||||
let step = f(agreement)?;
|
||||
fault_log.extend(step.fault_log);
|
||||
self.messages.extend_agreement(proposer_id, agreement);
|
||||
if let Some(output) = agreement.next_output() {
|
||||
if let Some(output) = step.output {
|
||||
output
|
||||
} else {
|
||||
return Ok(fault_log);
|
||||
|
@ -298,9 +306,10 @@ impl<NodeUid: Clone + Debug + Ord> CommonSubset<NodeUid> {
|
|||
// input 0 to each instance of BA that has not yet been provided input.
|
||||
for (uid, agreement) in &mut self.agreement_instances {
|
||||
if agreement.accepts_input() {
|
||||
agreement.set_input(false)?.merge_into(&mut fault_log);
|
||||
let step = agreement.set_input(false)?;
|
||||
fault_log.extend(step.fault_log);
|
||||
self.messages.extend_agreement(uid, agreement);
|
||||
if let Some(output) = agreement.next_output() {
|
||||
if let Some(output) = step.output {
|
||||
if self.agreement_results.insert(uid.clone(), output).is_some() {
|
||||
return Err(ErrorKind::MultipleAgreementResults.into());
|
||||
}
|
||||
|
|
|
@ -57,8 +57,8 @@ use serde::{Deserialize, Serialize};
|
|||
use self::votes::{SignedVote, VoteCounter};
|
||||
use crypto::{PublicKeySet, SecretKey, Signature};
|
||||
use fault_log::{FaultKind, FaultLog};
|
||||
use honey_badger::{HoneyBadger, Message as HbMessage};
|
||||
use messaging::{DistAlgorithm, NetworkInfo, Target, TargetedMessage};
|
||||
use honey_badger::{HoneyBadger, HoneyBadgerStep, Message as HbMessage};
|
||||
use messaging::{DistAlgorithm, NetworkInfo, Step, Target, TargetedMessage};
|
||||
use sync_key_gen::{Accept, Propose, ProposeOutcome, SyncKeyGen};
|
||||
|
||||
pub use self::batch::Batch;
|
||||
|
@ -111,6 +111,8 @@ where
|
|||
output: VecDeque<Batch<C, NodeUid>>,
|
||||
}
|
||||
|
||||
pub type DynamicHoneyBadgerStep<C, NodeUid> = Step<NodeUid, Batch<C, NodeUid>>;
|
||||
|
||||
impl<C, NodeUid> DistAlgorithm for DynamicHoneyBadger<C, NodeUid>
|
||||
where
|
||||
C: Eq + Serialize + for<'r> Deserialize<'r> + Debug + Hash,
|
||||
|
@ -122,50 +124,50 @@ where
|
|||
type Message = Message<NodeUid>;
|
||||
type Error = Error;
|
||||
|
||||
fn input(&mut self, input: Self::Input) -> Result<FaultLog<NodeUid>> {
|
||||
fn input(&mut self, input: Self::Input) -> Result<DynamicHoneyBadgerStep<C, NodeUid>> {
|
||||
// User contributions are forwarded to `HoneyBadger` right away. Votes are signed and
|
||||
// broadcast.
|
||||
match input {
|
||||
Input::User(contrib) => self.propose(contrib),
|
||||
Input::Change(change) => {
|
||||
self.vote_for(change)?;
|
||||
Ok(FaultLog::new())
|
||||
}
|
||||
}
|
||||
let fault_log = match input {
|
||||
Input::User(contrib) => self.propose(contrib)?,
|
||||
Input::Change(change) => self.vote_for(change)?,
|
||||
};
|
||||
self.step().with_fault_log(fault_log)
|
||||
}
|
||||
|
||||
fn handle_message(
|
||||
&mut self,
|
||||
sender_id: &NodeUid,
|
||||
message: Self::Message,
|
||||
) -> Result<FaultLog<NodeUid>> {
|
||||
) -> Result<DynamicHoneyBadgerStep<C, NodeUid>> {
|
||||
let epoch = message.epoch();
|
||||
if epoch < self.start_epoch {
|
||||
return Ok(FaultLog::new()); // Obsolete message.
|
||||
}
|
||||
if epoch > self.start_epoch {
|
||||
let fault_log = if epoch < self.start_epoch {
|
||||
// Obsolete message.
|
||||
FaultLog::new()
|
||||
} else if epoch > self.start_epoch {
|
||||
// Message cannot be handled yet. Save it for later.
|
||||
let entry = (sender_id.clone(), message);
|
||||
self.incoming_queue.push(entry);
|
||||
return Ok(FaultLog::new());
|
||||
}
|
||||
match message {
|
||||
Message::HoneyBadger(_, hb_msg) => self.handle_honey_badger_message(sender_id, hb_msg),
|
||||
Message::KeyGen(_, kg_msg, sig) => self.handle_key_gen_message(sender_id, kg_msg, *sig),
|
||||
Message::SignedVote(signed_vote) => {
|
||||
self.vote_counter.add_pending_vote(sender_id, signed_vote)
|
||||
FaultLog::new()
|
||||
} else {
|
||||
match message {
|
||||
Message::HoneyBadger(_, hb_msg) => {
|
||||
self.handle_honey_badger_message(sender_id, hb_msg)?
|
||||
}
|
||||
Message::KeyGen(_, kg_msg, sig) => {
|
||||
self.handle_key_gen_message(sender_id, kg_msg, *sig)?
|
||||
}
|
||||
Message::SignedVote(signed_vote) => {
|
||||
self.vote_counter.add_pending_vote(sender_id, signed_vote)?
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
self.step().with_fault_log(fault_log)
|
||||
}
|
||||
|
||||
fn next_message(&mut self) -> Option<TargetedMessage<Self::Message, NodeUid>> {
|
||||
self.messages.pop_front()
|
||||
}
|
||||
|
||||
fn next_output(&mut self) -> Option<Self::Output> {
|
||||
self.output.pop_front()
|
||||
}
|
||||
|
||||
fn terminated(&self) -> bool {
|
||||
false
|
||||
}
|
||||
|
@ -180,6 +182,10 @@ where
|
|||
C: Eq + Serialize + for<'r> Deserialize<'r> + Debug + Hash,
|
||||
NodeUid: Eq + Ord + Clone + Debug + Serialize + for<'r> Deserialize<'r> + Hash,
|
||||
{
|
||||
fn step(&mut self) -> Result<DynamicHoneyBadgerStep<C, NodeUid>> {
|
||||
Ok(Step::new(self.output.take()))
|
||||
}
|
||||
|
||||
/// Returns a new `DynamicHoneyBadgerBuilder` configured to use the node IDs and cryptographic
|
||||
/// keys specified by `netinfo`.
|
||||
pub fn builder(netinfo: NetworkInfo<NodeUid>) -> DynamicHoneyBadgerBuilder<C, NodeUid> {
|
||||
|
@ -193,13 +199,13 @@ where
|
|||
|
||||
/// Proposes a contribution in the current epoch.
|
||||
pub fn propose(&mut self, contrib: C) -> Result<FaultLog<NodeUid>> {
|
||||
let mut fault_log = self.honey_badger.input(InternalContrib {
|
||||
let mut fault_log = FaultLog::new();
|
||||
let step = self.honey_badger.input(InternalContrib {
|
||||
contrib,
|
||||
key_gen_messages: self.key_gen_msg_buffer.clone(),
|
||||
votes: self.vote_counter.pending_votes().cloned().collect(),
|
||||
})?;
|
||||
self.process_output()?.merge_into(&mut fault_log);
|
||||
Ok(fault_log)
|
||||
self.process_output(step)
|
||||
}
|
||||
|
||||
/// Cast a vote to change the set of validators.
|
||||
|
@ -229,9 +235,8 @@ where
|
|||
return Err(ErrorKind::UnknownSender.into());
|
||||
}
|
||||
// Handle the message and put the outgoing messages into the queue.
|
||||
let mut fault_log = self.honey_badger.handle_message(sender_id, message)?;
|
||||
self.process_output()?.merge_into(&mut fault_log);
|
||||
Ok(fault_log)
|
||||
let step = self.honey_badger.handle_message(sender_id, message)?;
|
||||
self.process_output(step)
|
||||
}
|
||||
|
||||
/// Handles a vote or key generation message and tries to commit it as a transaction. These
|
||||
|
@ -245,14 +250,20 @@ where
|
|||
self.verify_signature(sender_id, &sig, &kg_msg)?;
|
||||
let tx = SignedKeyGenMsg(self.start_epoch, sender_id.clone(), kg_msg, sig);
|
||||
self.key_gen_msg_buffer.push(tx);
|
||||
// FIXME: Remove the call to `process_output`. There wasn't any output from HB in this
|
||||
// function.
|
||||
self.process_output()
|
||||
}
|
||||
|
||||
/// Processes all pending batches output by Honey Badger.
|
||||
fn process_output(&mut self) -> Result<FaultLog<NodeUid>> {
|
||||
fn process_output(
|
||||
&mut self,
|
||||
step: HoneyBadgerStep<NodeUid, InternalContrib<C, NodeUid>>,
|
||||
) -> Result<FaultLog<NodeUid>> {
|
||||
let mut fault_log = FaultLog::new();
|
||||
fault_log.extend(step.fault_log);
|
||||
let start_epoch = self.start_epoch;
|
||||
while let Some(hb_batch) = self.honey_badger.next_output() {
|
||||
while let Some(hb_batch) = step.output.iter().next() {
|
||||
// Create the batch we output ourselves. It will contain the _user_ transactions of
|
||||
// `hb_batch`, and the current change state.
|
||||
let mut batch = Batch::new(hb_batch.epoch + self.start_epoch);
|
||||
|
@ -460,7 +471,7 @@ where
|
|||
|
||||
/// The contribution for the internal `HoneyBadger` instance: this includes a user-defined
|
||||
/// application-level contribution as well as internal signed messages.
|
||||
#[derive(Eq, PartialEq, Debug, Serialize, Deserialize, Hash)]
|
||||
#[derive(Clone, Eq, PartialEq, Debug, Serialize, Deserialize, Hash)]
|
||||
struct InternalContrib<C, NodeUid> {
|
||||
/// A user-defined contribution.
|
||||
contrib: C,
|
||||
|
|
|
@ -3,6 +3,7 @@ use std::collections::{BTreeMap, BTreeSet, VecDeque};
|
|||
use std::fmt::Debug;
|
||||
use std::hash::Hash;
|
||||
use std::marker::PhantomData;
|
||||
use std::mem::replace;
|
||||
use std::ops::Not;
|
||||
use std::sync::Arc;
|
||||
|
||||
|
@ -10,10 +11,10 @@ use bincode;
|
|||
use itertools::Itertools;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use common_subset::{self, CommonSubset};
|
||||
use common_subset::{self, CommonSubset, CommonSubsetStep};
|
||||
use crypto::{Ciphertext, DecryptionShare};
|
||||
use fault_log::{FaultKind, FaultLog};
|
||||
use messaging::{DistAlgorithm, NetworkInfo, Target, TargetedMessage};
|
||||
use messaging::{DistAlgorithm, NetworkInfo, Step, Target, TargetedMessage};
|
||||
|
||||
error_chain!{
|
||||
types {
|
||||
|
@ -72,7 +73,7 @@ where
|
|||
common_subsets: BTreeMap::new(),
|
||||
max_future_epochs: self.max_future_epochs as u64,
|
||||
messages: MessageQueue(VecDeque::new()),
|
||||
output: VecDeque::new(),
|
||||
output: None,
|
||||
incoming_queue: BTreeMap::new(),
|
||||
received_shares: BTreeMap::new(),
|
||||
decrypted_contributions: BTreeMap::new(),
|
||||
|
@ -96,8 +97,8 @@ pub struct HoneyBadger<C, NodeUid> {
|
|||
max_future_epochs: u64,
|
||||
/// The messages that need to be sent to other nodes.
|
||||
messages: MessageQueue<NodeUid>,
|
||||
/// The outputs from completed epochs.
|
||||
output: VecDeque<Batch<C, NodeUid>>,
|
||||
/// The output from a completed epoch.
|
||||
output: Option<Batch<C, NodeUid>>,
|
||||
/// Messages for future epochs that couldn't be handled yet.
|
||||
incoming_queue: BTreeMap<u64, Vec<(NodeUid, MessageContent<NodeUid>)>>,
|
||||
/// Received decryption shares for an epoch. Each decryption share has a sender and a
|
||||
|
@ -110,6 +111,8 @@ pub struct HoneyBadger<C, NodeUid> {
|
|||
ciphertexts: BTreeMap<u64, BTreeMap<NodeUid, Ciphertext>>,
|
||||
}
|
||||
|
||||
pub type HoneyBadgerStep<C, NodeUid> = Step<NodeUid, Batch<C, NodeUid>>;
|
||||
|
||||
impl<C, NodeUid> DistAlgorithm for HoneyBadger<C, NodeUid>
|
||||
where
|
||||
C: Serialize + for<'r> Deserialize<'r> + Debug + Hash + Eq,
|
||||
|
@ -121,42 +124,37 @@ where
|
|||
type Message = Message<NodeUid>;
|
||||
type Error = Error;
|
||||
|
||||
fn input(&mut self, input: Self::Input) -> HoneyBadgerResult<FaultLog<NodeUid>> {
|
||||
Ok(self.propose(&input)?)
|
||||
fn input(&mut self, input: Self::Input) -> HoneyBadgerResult<HoneyBadgerStep<C, NodeUid>> {
|
||||
let fault_log = self.propose(&input)?;
|
||||
self.step().with_fault_log(fault_log)
|
||||
}
|
||||
|
||||
fn handle_message(
|
||||
&mut self,
|
||||
sender_id: &NodeUid,
|
||||
message: Self::Message,
|
||||
) -> HoneyBadgerResult<FaultLog<NodeUid>> {
|
||||
) -> HoneyBadgerResult<HoneyBadgerStep<C, NodeUid>> {
|
||||
if !self.netinfo.all_uids().contains(sender_id) {
|
||||
return Err(ErrorKind::UnknownSender.into());
|
||||
}
|
||||
let Message { epoch, content } = message;
|
||||
if epoch < self.epoch {
|
||||
// Ignore all messages from past epochs.
|
||||
return Ok(FaultLog::new());
|
||||
}
|
||||
let mut fault_log = FaultLog::new();
|
||||
if epoch > self.epoch + self.max_future_epochs {
|
||||
// Postpone handling this message.
|
||||
self.incoming_queue
|
||||
.entry(epoch)
|
||||
.or_insert_with(Vec::new)
|
||||
.push((sender_id.clone(), content));
|
||||
return Ok(FaultLog::new());
|
||||
}
|
||||
self.handle_message_content(sender_id, epoch, content)
|
||||
} else if epoch == self.epoch {
|
||||
fault_log.extend(self.handle_message_content(sender_id, epoch, content)?);
|
||||
} // And ignore all messages from past epochs.
|
||||
self.step().with_fault_log(fault_log)
|
||||
}
|
||||
|
||||
fn next_message(&mut self) -> Option<TargetedMessage<Self::Message, NodeUid>> {
|
||||
self.messages.pop_front()
|
||||
}
|
||||
|
||||
fn next_output(&mut self) -> Option<Self::Output> {
|
||||
self.output.pop_front()
|
||||
}
|
||||
|
||||
fn terminated(&self) -> bool {
|
||||
false
|
||||
}
|
||||
|
@ -177,6 +175,10 @@ where
|
|||
HoneyBadgerBuilder::new(netinfo)
|
||||
}
|
||||
|
||||
fn step(&mut self) -> HoneyBadgerResult<HoneyBadgerStep<C, NodeUid>> {
|
||||
Ok(Step::new(replace(&mut self.output, Default::default())))
|
||||
}
|
||||
|
||||
/// Proposes a new item in the current epoch.
|
||||
pub fn propose(&mut self, proposal: &C) -> HoneyBadgerResult<FaultLog<NodeUid>> {
|
||||
if !self.netinfo.is_validator() {
|
||||
|
@ -226,7 +228,7 @@ where
|
|||
message: common_subset::Message<NodeUid>,
|
||||
) -> HoneyBadgerResult<FaultLog<NodeUid>> {
|
||||
let mut fault_log = FaultLog::new();
|
||||
{
|
||||
let step = {
|
||||
// Borrow the instance for `epoch`, or create it.
|
||||
let cs = match self.common_subsets.entry(epoch) {
|
||||
Entry::Occupied(entry) => entry.into_mut(),
|
||||
|
@ -240,13 +242,14 @@ where
|
|||
}
|
||||
};
|
||||
// Handle the message and put the outgoing messages into the queue.
|
||||
cs.handle_message(sender_id, message)?
|
||||
.merge_into(&mut fault_log);
|
||||
let step = cs.handle_message(sender_id, message)?;
|
||||
fault_log.extend(step.fault_log);
|
||||
self.messages.extend_with_epoch(epoch, cs);
|
||||
}
|
||||
step
|
||||
};
|
||||
// If this is the current epoch, the message could cause a new output.
|
||||
if epoch == self.epoch {
|
||||
self.process_output()?.merge_into(&mut fault_log);
|
||||
fault_log.extend(self.process_output(step)?);
|
||||
}
|
||||
self.remove_terminated(epoch);
|
||||
Ok(fault_log)
|
||||
|
@ -344,8 +347,8 @@ where
|
|||
batch.contributions
|
||||
);
|
||||
// Queue the output and advance the epoch.
|
||||
self.output.push_back(batch);
|
||||
self.update_epoch()?.merge_into(&mut fault_log);
|
||||
self.output = Some(batch);
|
||||
fault_log.extend(self.update_epoch()?);
|
||||
Ok(fault_log)
|
||||
}
|
||||
|
||||
|
@ -544,23 +547,18 @@ where
|
|||
}
|
||||
|
||||
/// Checks whether the current epoch has output, and if it does, sends out our decryption shares.
|
||||
fn process_output(&mut self) -> HoneyBadgerResult<FaultLog<NodeUid>> {
|
||||
fn process_output(
|
||||
&mut self,
|
||||
step: CommonSubsetStep<NodeUid>,
|
||||
) -> HoneyBadgerResult<FaultLog<NodeUid>> {
|
||||
let mut fault_log = FaultLog::new();
|
||||
if let Some(cs_output) = self.take_current_output() {
|
||||
self.send_decryption_shares(cs_output)?
|
||||
.merge_into(&mut fault_log);
|
||||
if let Some(cs_output) = step.output {
|
||||
fault_log.extend(self.send_decryption_shares(cs_output)?);
|
||||
// TODO: May also check that there is no further output from Common Subset.
|
||||
}
|
||||
Ok(fault_log)
|
||||
}
|
||||
|
||||
/// Returns the output of the current epoch's `CommonSubset` instance, if any.
|
||||
fn take_current_output(&mut self) -> Option<BTreeMap<NodeUid, Vec<u8>>> {
|
||||
self.common_subsets
|
||||
.get_mut(&self.epoch)
|
||||
.and_then(CommonSubset::next_output)
|
||||
}
|
||||
|
||||
/// Removes all `CommonSubset` instances from _past_ epochs that have terminated.
|
||||
fn remove_terminated(&mut self, from_epoch: u64) {
|
||||
for epoch in from_epoch..self.epoch {
|
||||
|
|
|
@ -51,6 +51,45 @@ impl<M, N> TargetedMessage<M, N> {
|
|||
}
|
||||
}
|
||||
|
||||
/// Result of one step of the local state machine of a distributed algorithm. Such a result should
|
||||
/// be used and never discarded by the client of the algorithm.
|
||||
pub struct Step<N, O>
|
||||
where
|
||||
N: Clone,
|
||||
{
|
||||
pub output: Option<O>,
|
||||
pub fault_log: FaultLog<N>,
|
||||
}
|
||||
|
||||
impl<N, O> Default for Step<N, O>
|
||||
where
|
||||
N: Clone,
|
||||
{
|
||||
fn default() -> Step<N, O> {
|
||||
Step {
|
||||
output: None,
|
||||
fault_log: FaultLog::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<N, O> Step<N, O>
|
||||
where
|
||||
N: Clone,
|
||||
{
|
||||
pub fn new(output: Option<O>) -> Self {
|
||||
Step {
|
||||
output,
|
||||
fault_log: FaultLog::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_fault_log(&mut self, fault_log: FaultLog<N>) -> &mut Self {
|
||||
self.fault_log = fault_log;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
/// A distributed algorithm that defines a message flow.
|
||||
pub trait DistAlgorithm {
|
||||
/// Unique node identifier.
|
||||
|
@ -66,21 +105,23 @@ pub trait DistAlgorithm {
|
|||
type Error: Debug;
|
||||
|
||||
/// Handles an input provided by the user, and returns
|
||||
fn input(&mut self, input: Self::Input) -> Result<FaultLog<Self::NodeUid>, Self::Error>;
|
||||
#[must_use]
|
||||
fn input(
|
||||
&mut self,
|
||||
input: Self::Input,
|
||||
) -> Result<Step<Self::NodeUid, Self::Output>, Self::Error>;
|
||||
|
||||
/// Handles a message received from node `sender_id`.
|
||||
#[must_use]
|
||||
fn handle_message(
|
||||
&mut self,
|
||||
sender_id: &Self::NodeUid,
|
||||
message: Self::Message,
|
||||
) -> Result<FaultLog<Self::NodeUid>, Self::Error>;
|
||||
) -> Result<Step<Self::NodeUid, Self::Output>, Self::Error>;
|
||||
|
||||
/// Returns a message that needs to be sent to another node.
|
||||
fn next_message(&mut self) -> Option<TargetedMessage<Self::Message, Self::NodeUid>>;
|
||||
|
||||
/// Returns the algorithm's output.
|
||||
fn next_output(&mut self) -> Option<Self::Output>;
|
||||
|
||||
/// Returns `true` if execution has completed and this instance can be dropped.
|
||||
fn terminated(&self) -> bool;
|
||||
|
||||
|
@ -94,14 +135,6 @@ pub trait DistAlgorithm {
|
|||
{
|
||||
MessageIter { algorithm: self }
|
||||
}
|
||||
|
||||
/// Returns an iterator over the algorithm's outputs.
|
||||
fn output_iter(&mut self) -> OutputIter<Self>
|
||||
where
|
||||
Self: Sized,
|
||||
{
|
||||
OutputIter { algorithm: self }
|
||||
}
|
||||
}
|
||||
|
||||
/// An iterator over a distributed algorithm's outgoing messages.
|
||||
|
@ -117,19 +150,6 @@ impl<'a, D: DistAlgorithm + 'a> Iterator for MessageIter<'a, D> {
|
|||
}
|
||||
}
|
||||
|
||||
/// An iterator over a distributed algorithm's pending outputs.
|
||||
pub struct OutputIter<'a, D: DistAlgorithm + 'a> {
|
||||
algorithm: &'a mut D,
|
||||
}
|
||||
|
||||
impl<'a, D: DistAlgorithm + 'a> Iterator for OutputIter<'a, D> {
|
||||
type Item = D::Output;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
self.algorithm.next_output()
|
||||
}
|
||||
}
|
||||
|
||||
/// Common data shared between algorithms.
|
||||
///
|
||||
/// *NOTE* `NetworkInfo` requires its `secret_key` to be heap allocated and
|
||||
|
|
|
@ -11,7 +11,7 @@ use serde::{Deserialize, Serialize};
|
|||
|
||||
use dynamic_honey_badger::{self, Batch as DhbBatch, DynamicHoneyBadger, Message};
|
||||
use fault_log::FaultLog;
|
||||
use messaging::{DistAlgorithm, NetworkInfo, TargetedMessage};
|
||||
use messaging::{DistAlgorithm, NetworkInfo, Step, TargetedMessage};
|
||||
use transaction_queue::TransactionQueue;
|
||||
|
||||
pub use dynamic_honey_badger::{Change, ChangeState, Input};
|
||||
|
@ -121,6 +121,8 @@ where
|
|||
output: VecDeque<Batch<Tx, NodeUid>>,
|
||||
}
|
||||
|
||||
pub type QueueingHoneyBadgerStep<Tx, NodeUid> = Step<NodeUid, Batch<Tx, NodeUid>>;
|
||||
|
||||
impl<Tx, NodeUid> DistAlgorithm for QueueingHoneyBadger<Tx, NodeUid>
|
||||
where
|
||||
Tx: Eq + Serialize + for<'r> Deserialize<'r> + Debug + Hash + Clone,
|
||||
|
@ -132,42 +134,45 @@ where
|
|||
type Message = Message<NodeUid>;
|
||||
type Error = Error;
|
||||
|
||||
fn input(&mut self, input: Self::Input) -> Result<FaultLog<NodeUid>> {
|
||||
fn input(&mut self, input: Self::Input) -> Result<QueueingHoneyBadgerStep<Tx, NodeUid>> {
|
||||
// User transactions are forwarded to `HoneyBadger` right away. Internal messages are
|
||||
// in addition signed and broadcast.
|
||||
match input {
|
||||
let fault_log = match input {
|
||||
Input::User(tx) => {
|
||||
self.queue.0.push_back(tx);
|
||||
Ok(FaultLog::new())
|
||||
FaultLog::new()
|
||||
}
|
||||
Input::Change(change) => Ok(self.dyn_hb.input(Input::Change(change))?),
|
||||
}
|
||||
Input::Change(change) => {
|
||||
let step = self.dyn_hb.input(Input::Change(change))?;
|
||||
// FIXME: Use the output since `dyn_hb` can output immediately on input.
|
||||
step.fault_log
|
||||
}
|
||||
};
|
||||
self.step().with_fault_log(fault_log)
|
||||
}
|
||||
|
||||
fn handle_message(
|
||||
&mut self,
|
||||
sender_id: &NodeUid,
|
||||
message: Self::Message,
|
||||
) -> Result<FaultLog<NodeUid>> {
|
||||
let mut fault_log = self.dyn_hb.handle_message(sender_id, message)?;
|
||||
while let Some(batch) = self.dyn_hb.next_output() {
|
||||
) -> Result<QueueingHoneyBadgerStep<Tx, NodeUid>> {
|
||||
let step = self.dyn_hb.handle_message(sender_id, message)?;
|
||||
let fault_log = FaultLog::new();
|
||||
fault_log.extend(step.fault_log);
|
||||
if let Some(batch) = step.output {
|
||||
self.queue.remove_all(batch.iter());
|
||||
self.output.push_back(batch);
|
||||
}
|
||||
if !self.dyn_hb.has_input() {
|
||||
self.propose()?.merge_into(&mut fault_log);
|
||||
fault_log.extend(self.propose()?);
|
||||
}
|
||||
Ok(fault_log)
|
||||
self.step().with_fault_log(fault_log)
|
||||
}
|
||||
|
||||
fn next_message(&mut self) -> Option<TargetedMessage<Self::Message, NodeUid>> {
|
||||
self.dyn_hb.next_message()
|
||||
}
|
||||
|
||||
fn next_output(&mut self) -> Option<Self::Output> {
|
||||
self.output.pop_front()
|
||||
}
|
||||
|
||||
fn terminated(&self) -> bool {
|
||||
false
|
||||
}
|
||||
|
@ -188,11 +193,17 @@ where
|
|||
QueueingHoneyBadgerBuilder::new(netinfo)
|
||||
}
|
||||
|
||||
fn step(&mut self) -> Result<QueueingHoneyBadgerStep<Tx, NodeUid>> {
|
||||
Ok(Step::new(self.output.take()))
|
||||
}
|
||||
|
||||
/// Initiates the next epoch by proposing a batch from the queue.
|
||||
fn propose(&mut self) -> Result<FaultLog<NodeUid>> {
|
||||
let amount = self.batch_size / self.dyn_hb.netinfo().num_nodes();
|
||||
let proposal = self.queue.choose(amount, self.batch_size);
|
||||
Ok(self.dyn_hb.input(Input::User(proposal))?)
|
||||
let step = self.dyn_hb.input(Input::User(proposal))?;
|
||||
// FIXME: Use `step.output`.
|
||||
Ok(step.fault_log)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -38,8 +38,8 @@ impl<D: DistAlgorithm> TestNode<D> {
|
|||
|
||||
/// Inputs a value into the instance.
|
||||
pub fn input(&mut self, input: D::Input) {
|
||||
self.algo.input(input).expect("input");
|
||||
self.outputs.extend(self.algo.output_iter());
|
||||
let step = self.algo.input(input).expect("input");
|
||||
self.outputs.extend(step.output);
|
||||
}
|
||||
|
||||
/// Returns the internal algorithm's instance.
|
||||
|
@ -49,13 +49,12 @@ impl<D: DistAlgorithm> TestNode<D> {
|
|||
}
|
||||
|
||||
/// Creates a new test node with the given broadcast instance.
|
||||
fn new(mut algo: D) -> TestNode<D> {
|
||||
let outputs = algo.output_iter().collect();
|
||||
fn new(algo: D) -> TestNode<D> {
|
||||
TestNode {
|
||||
id: algo.our_id().clone(),
|
||||
algo,
|
||||
queue: VecDeque::new(),
|
||||
outputs,
|
||||
outputs: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -63,10 +62,11 @@ impl<D: DistAlgorithm> TestNode<D> {
|
|||
fn handle_message(&mut self) {
|
||||
let (from_id, msg) = self.queue.pop_front().expect("message not found");
|
||||
debug!("Handling {:?} -> {:?}: {:?}", from_id, self.id, msg);
|
||||
self.algo
|
||||
let step = self
|
||||
.algo
|
||||
.handle_message(&from_id, msg)
|
||||
.expect("handling message");
|
||||
self.outputs.extend(self.algo.output_iter());
|
||||
self.outputs.extend(step.output);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue