mirror of https://github.com/poanetwork/hbbft.git
changed Step::output to Vec but that didn't fix the dynamic HB test
This commit is contained in:
parent
1254d40147
commit
7fb1017bb1
|
@ -149,14 +149,14 @@ impl<T: Clone + Debug + AsRef<[u8]> + PartialEq + Send + Sync + From<Vec<u8>> +
|
||||||
let message = rx_to_algo.recv().expect("receive from algo");
|
let message = rx_to_algo.recv().expect("receive from algo");
|
||||||
let SourcedMessage { source: i, message } = message;
|
let SourcedMessage { source: i, message } = message;
|
||||||
debug!("{} received from {}: {:?}", our_id, i, message);
|
debug!("{} received from {}: {:?}", our_id, i, message);
|
||||||
broadcast
|
let step = broadcast
|
||||||
.handle_message(&i, message)
|
.handle_message(&i, message)
|
||||||
.expect("handle broadcast message");
|
.expect("handle broadcast message");
|
||||||
for msg in broadcast.message_iter() {
|
for msg in broadcast.message_iter() {
|
||||||
debug!("{} sending to {:?}: {:?}", our_id, msg.target, msg.message);
|
debug!("{} sending to {:?}: {:?}", our_id, msg.target, msg.message);
|
||||||
tx_from_algo.send(msg).expect("send from algo");
|
tx_from_algo.send(msg).expect("send from algo");
|
||||||
}
|
}
|
||||||
if let Some(output) = broadcast.next_output() {
|
if let Some(output) = step.output.into_iter().next() {
|
||||||
println!(
|
println!(
|
||||||
"Broadcast succeeded! Node {} output: {}",
|
"Broadcast succeeded! Node {} output: {}",
|
||||||
our_id,
|
our_id,
|
||||||
|
|
|
@ -24,7 +24,7 @@ use serde::Serialize;
|
||||||
use signifix::{metric, TryFrom};
|
use signifix::{metric, TryFrom};
|
||||||
|
|
||||||
use hbbft::crypto::SecretKeySet;
|
use hbbft::crypto::SecretKeySet;
|
||||||
use hbbft::messaging::{DistAlgorithm, NetworkInfo, Target};
|
use hbbft::messaging::{DistAlgorithm, NetworkInfo, Step, Target};
|
||||||
use hbbft::queueing_honey_badger::{Batch, QueueingHoneyBadger};
|
use hbbft::queueing_honey_badger::{Batch, QueueingHoneyBadger};
|
||||||
|
|
||||||
const VERSION: &str = env!("CARGO_PKG_VERSION");
|
const VERSION: &str = env!("CARGO_PKG_VERSION");
|
||||||
|
@ -153,7 +153,7 @@ where
|
||||||
message_size: 0,
|
message_size: 0,
|
||||||
hw_quality,
|
hw_quality,
|
||||||
};
|
};
|
||||||
node.send_output_and_msgs();
|
node.send_output_and_msgs(Step::default());
|
||||||
node
|
node
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -165,15 +165,19 @@ where
|
||||||
self.message_size += ts_msg.message.len() as u64;
|
self.message_size += ts_msg.message.len() as u64;
|
||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
let msg = bincode::deserialize::<D::Message>(&ts_msg.message).expect("deserialize");
|
let msg = bincode::deserialize::<D::Message>(&ts_msg.message).expect("deserialize");
|
||||||
self.algo
|
let step = self
|
||||||
|
.algo
|
||||||
.handle_message(&ts_msg.sender_id, msg)
|
.handle_message(&ts_msg.sender_id, msg)
|
||||||
.expect("handling message");
|
.expect("handling message");
|
||||||
self.time += start.elapsed() * self.hw_quality.cpu_factor / 100;
|
self.time += start.elapsed() * self.hw_quality.cpu_factor / 100;
|
||||||
self.send_output_and_msgs()
|
self.send_output_and_msgs(step)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Handles the algorithm's output and messages.
|
/// Handles the algorithm's output and messages.
|
||||||
fn send_output_and_msgs(&mut self) {
|
fn send_output_and_msgs(
|
||||||
|
&mut self,
|
||||||
|
step: Step<<D as DistAlgorithm>::NodeUid, <D as DistAlgorithm>::Output>,
|
||||||
|
) {
|
||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
let out_msgs: Vec<_> = self
|
let out_msgs: Vec<_> = self
|
||||||
.algo
|
.algo
|
||||||
|
@ -188,7 +192,7 @@ where
|
||||||
self.time += start.elapsed() * self.hw_quality.cpu_factor / 100;
|
self.time += start.elapsed() * self.hw_quality.cpu_factor / 100;
|
||||||
let time = self.time;
|
let time = self.time;
|
||||||
self.outputs
|
self.outputs
|
||||||
.extend(self.algo.output_iter().map(|out| (time, out)));
|
.extend(step.output.into_iter().map(|out| (time, out)));
|
||||||
self.sent_time = cmp::max(self.time, self.sent_time);
|
self.sent_time = cmp::max(self.time, self.sent_time);
|
||||||
for (target, message) in out_msgs {
|
for (target, message) in out_msgs {
|
||||||
self.sent_time += self.hw_quality.inv_bw * message.len() as u32;
|
self.sent_time += self.hw_quality.inv_bw * message.len() as u32;
|
||||||
|
|
|
@ -74,8 +74,8 @@ use itertools::Itertools;
|
||||||
|
|
||||||
use agreement::bin_values::BinValues;
|
use agreement::bin_values::BinValues;
|
||||||
use common_coin;
|
use common_coin;
|
||||||
use fault_log::FaultLog;
|
|
||||||
use common_coin::{CommonCoin, CommonCoinMessage, CommonCoinStep};
|
use common_coin::{CommonCoin, CommonCoinMessage, CommonCoinStep};
|
||||||
|
use fault_log::FaultLog;
|
||||||
use messaging::{DistAlgorithm, NetworkInfo, Step, Target, TargetedMessage};
|
use messaging::{DistAlgorithm, NetworkInfo, Step, Target, TargetedMessage};
|
||||||
|
|
||||||
error_chain!{
|
error_chain!{
|
||||||
|
@ -197,8 +197,8 @@ impl<NodeUid: Clone + Debug + Ord> DistAlgorithm for Agreement<NodeUid> {
|
||||||
type Error = Error;
|
type Error = Error;
|
||||||
|
|
||||||
fn input(&mut self, input: Self::Input) -> AgreementResult<AgreementStep<NodeUid>> {
|
fn input(&mut self, input: Self::Input) -> AgreementResult<AgreementStep<NodeUid>> {
|
||||||
let fault_log = self.set_input(input);
|
let fault_log = self.set_input(input)?;
|
||||||
self.step().with_fault_log(fault_log)
|
self.step(fault_log)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Receive input from a remote node.
|
/// Receive input from a remote node.
|
||||||
|
@ -207,23 +207,23 @@ impl<NodeUid: Clone + Debug + Ord> DistAlgorithm for Agreement<NodeUid> {
|
||||||
sender_id: &Self::NodeUid,
|
sender_id: &Self::NodeUid,
|
||||||
message: Self::Message,
|
message: Self::Message,
|
||||||
) -> AgreementResult<AgreementStep<NodeUid>> {
|
) -> AgreementResult<AgreementStep<NodeUid>> {
|
||||||
if self.terminated || message.epoch < self.epoch {
|
let fault_log = if self.terminated || message.epoch < self.epoch {
|
||||||
// Message is obsolete: We are already in a later epoch or terminated.
|
// Message is obsolete: We are already in a later epoch or terminated.
|
||||||
return self.step();
|
FaultLog::default()
|
||||||
}
|
} else if message.epoch > self.epoch {
|
||||||
if message.epoch > self.epoch {
|
|
||||||
// Message is for a later epoch. We can't handle that yet.
|
// Message is for a later epoch. We can't handle that yet.
|
||||||
self.incoming_queue.push((sender_id.clone(), message));
|
self.incoming_queue.push((sender_id.clone(), message));
|
||||||
return self.step();
|
FaultLog::default()
|
||||||
}
|
} else {
|
||||||
let fault_log = match message.content {
|
match message.content {
|
||||||
AgreementContent::BVal(b) => self.handle_bval(sender_id, b)?,
|
AgreementContent::BVal(b) => self.handle_bval(sender_id, b)?,
|
||||||
AgreementContent::Aux(b) => self.handle_aux(sender_id, b)?,
|
AgreementContent::Aux(b) => self.handle_aux(sender_id, b)?,
|
||||||
AgreementContent::Conf(v) => self.handle_conf(sender_id, v)?,
|
AgreementContent::Conf(v) => self.handle_conf(sender_id, v)?,
|
||||||
AgreementContent::Term(v) => self.handle_term(sender_id, v)?.map(|()| FaultLog::new()),
|
AgreementContent::Term(v) => self.handle_term(sender_id, v)?,
|
||||||
AgreementContent::Coin(msg) => self.handle_coin(sender_id, *msg)?,
|
AgreementContent::Coin(msg) => self.handle_coin(sender_id, *msg)?,
|
||||||
|
}
|
||||||
};
|
};
|
||||||
self.step().with_fault_log(fault_log)
|
self.step(fault_log)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Take the next Agreement message for multicast to all other nodes.
|
/// Take the next Agreement message for multicast to all other nodes.
|
||||||
|
@ -280,8 +280,11 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn step(&mut self) -> AgreementResult<AgreementStep<NodeUid>> {
|
fn step(&mut self, fault_log: FaultLog<NodeUid>) -> AgreementResult<AgreementStep<NodeUid>> {
|
||||||
Ok(Step::new(self.output.take()))
|
Ok(Step::new(
|
||||||
|
self.output.take().into_iter().collect(),
|
||||||
|
fault_log,
|
||||||
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Sets the input value for agreement.
|
/// Sets the input value for agreement.
|
||||||
|
@ -298,7 +301,7 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
|
||||||
// Set the initial estimated value to the input value.
|
// Set the initial estimated value to the input value.
|
||||||
self.estimated = Some(input);
|
self.estimated = Some(input);
|
||||||
// Record the input value as sent.
|
// Record the input value as sent.
|
||||||
self.send_bval(input);
|
self.send_bval(input)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -450,7 +453,7 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
|
||||||
/// Receives a `Term(v)` message. If we haven't yet decided on a value and there are more than
|
/// Receives a `Term(v)` message. If we haven't yet decided on a value and there are more than
|
||||||
/// `num_faulty` such messages with the same value from different nodes, performs expedite
|
/// `num_faulty` such messages with the same value from different nodes, performs expedite
|
||||||
/// termination: decides on `v`, broadcasts `Term(v)` and terminates the instance.
|
/// termination: decides on `v`, broadcasts `Term(v)` and terminates the instance.
|
||||||
fn handle_term(&mut self, sender_id: &NodeUid, b: bool) -> AgreementResult<()> {
|
fn handle_term(&mut self, sender_id: &NodeUid, b: bool) -> AgreementResult<FaultLog<NodeUid>> {
|
||||||
self.received_term.insert(sender_id.clone(), b);
|
self.received_term.insert(sender_id.clone(), b);
|
||||||
// Check for the expedite termination condition.
|
// Check for the expedite termination condition.
|
||||||
if self.decision.is_none()
|
if self.decision.is_none()
|
||||||
|
@ -459,7 +462,7 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
|
||||||
{
|
{
|
||||||
self.decide(b);
|
self.decide(b);
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(FaultLog::new())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Handles a Common Coin message. If there is output from Common Coin, starts the next
|
/// Handles a Common Coin message. If there is output from Common Coin, starts the next
|
||||||
|
@ -474,9 +477,13 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
|
||||||
self.on_coin_step(coin_step)
|
self.on_coin_step(coin_step)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn on_coin_step(&mut self, coin_step: CommonCoinStep<NodeUid>) -> AgreementResult<FaultLog<NodeUid>> {
|
fn on_coin_step(
|
||||||
|
&mut self,
|
||||||
|
coin_step: CommonCoinStep<NodeUid>,
|
||||||
|
) -> AgreementResult<FaultLog<NodeUid>> {
|
||||||
let mut fault_log = FaultLog::new();
|
let mut fault_log = FaultLog::new();
|
||||||
if let Some(coin) = coin_step.output {
|
fault_log.extend(coin_step.fault_log);
|
||||||
|
if let Some(coin) = coin_step.output.into_iter().next() {
|
||||||
let def_bin_value = self.count_conf().1.definite();
|
let def_bin_value = self.count_conf().1.definite();
|
||||||
fault_log.extend(self.on_coin(coin, def_bin_value)?);
|
fault_log.extend(self.on_coin(coin, def_bin_value)?);
|
||||||
}
|
}
|
||||||
|
@ -515,7 +522,7 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
|
||||||
let step = self.handle_message(&sender_id, msg)?;
|
let step = self.handle_message(&sender_id, msg)?;
|
||||||
fault_log.extend(step.fault_log);
|
fault_log.extend(step.fault_log);
|
||||||
// Save the output of the internal call.
|
// Save the output of the internal call.
|
||||||
self.output = step.output;
|
self.output = step.output.into_iter().next();
|
||||||
if self.terminated {
|
if self.terminated {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -567,20 +574,17 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn try_finish_conf_round(&mut self) -> AgreementResult<FaultLog<NodeUid>> {
|
fn try_finish_conf_round(&mut self) -> AgreementResult<FaultLog<NodeUid>> {
|
||||||
let mut fault_log = FaultLog::new();
|
if self.conf_round
|
||||||
if self.conf_round {
|
&& self.count_conf().0 >= self.netinfo.num_nodes() - self.netinfo.num_faulty()
|
||||||
let (count_vals, _) = self.count_conf();
|
{
|
||||||
if count_vals < self.netinfo.num_nodes() - self.netinfo.num_faulty() {
|
|
||||||
// Continue waiting for (N - f) `Conf` messages
|
|
||||||
return Ok(fault_log);
|
|
||||||
}
|
|
||||||
// Invoke the comon coin.
|
// Invoke the comon coin.
|
||||||
let coin_step = self.common_coin.input(())?;
|
let coin_step = self.common_coin.input(())?;
|
||||||
fault_log.extend(coin_step.fault_log);
|
|
||||||
self.extend_common_coin();
|
self.extend_common_coin();
|
||||||
self.on_coin_step(coin_step)?;
|
self.on_coin_step(coin_step)
|
||||||
|
} else {
|
||||||
|
// Continue waiting for (N - f) `Conf` messages
|
||||||
|
Ok(FaultLog::default())
|
||||||
}
|
}
|
||||||
Ok(fault_log)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn send_aux(&mut self, b: bool) -> AgreementResult<FaultLog<NodeUid>> {
|
fn send_aux(&mut self, b: bool) -> AgreementResult<FaultLog<NodeUid>> {
|
||||||
|
|
|
@ -44,7 +44,6 @@
|
||||||
use std::collections::{BTreeMap, VecDeque};
|
use std::collections::{BTreeMap, VecDeque};
|
||||||
use std::fmt::{self, Debug};
|
use std::fmt::{self, Debug};
|
||||||
use std::iter::once;
|
use std::iter::once;
|
||||||
use std::mem::replace;
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use byteorder::{BigEndian, ByteOrder};
|
use byteorder::{BigEndian, ByteOrder};
|
||||||
|
@ -140,7 +139,7 @@ impl<NodeUid: Debug + Clone + Ord> DistAlgorithm for Broadcast<NodeUid> {
|
||||||
let proof = self.send_shards(input)?;
|
let proof = self.send_shards(input)?;
|
||||||
let our_uid = &self.netinfo.our_uid().clone();
|
let our_uid = &self.netinfo.our_uid().clone();
|
||||||
let fault_log = self.handle_value(our_uid, proof)?;
|
let fault_log = self.handle_value(our_uid, proof)?;
|
||||||
self.step().with_fault_log(fault_log)
|
self.step(fault_log)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_message(
|
fn handle_message(
|
||||||
|
@ -155,10 +154,10 @@ impl<NodeUid: Debug + Clone + Ord> DistAlgorithm for Broadcast<NodeUid> {
|
||||||
BroadcastMessage::Value(p) => self.handle_value(sender_id, p)?,
|
BroadcastMessage::Value(p) => self.handle_value(sender_id, p)?,
|
||||||
BroadcastMessage::Echo(p) => self.handle_echo(sender_id, p)?,
|
BroadcastMessage::Echo(p) => self.handle_echo(sender_id, p)?,
|
||||||
BroadcastMessage::Ready(ref hash) => self
|
BroadcastMessage::Ready(ref hash) => self
|
||||||
.handle_ready(sender_id, hash)?
|
.handle_ready(sender_id, hash)
|
||||||
.map(|()| FaultLog::new()),
|
.map(|()| FaultLog::new())?,
|
||||||
};
|
};
|
||||||
self.step().with_fault_log(fault_log)
|
self.step(fault_log)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn next_message(&mut self) -> Option<TargetedMessage<Self::Message, NodeUid>> {
|
fn next_message(&mut self) -> Option<TargetedMessage<Self::Message, NodeUid>> {
|
||||||
|
@ -197,8 +196,11 @@ impl<NodeUid: Debug + Clone + Ord> Broadcast<NodeUid> {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn step(&mut self) -> BroadcastResult<BroadcastStep<NodeUid>> {
|
fn step(&mut self, fault_log: FaultLog<NodeUid>) -> BroadcastResult<BroadcastStep<NodeUid>> {
|
||||||
Ok(Step::new(replace(&mut self.output, None)))
|
Ok(Step::new(
|
||||||
|
self.output.take().into_iter().collect(),
|
||||||
|
fault_log,
|
||||||
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Breaks the input value into shards of equal length and encodes them --
|
/// Breaks the input value into shards of equal length and encodes them --
|
||||||
|
|
|
@ -99,7 +99,7 @@ where
|
||||||
} else {
|
} else {
|
||||||
FaultLog::new()
|
FaultLog::new()
|
||||||
};
|
};
|
||||||
self.step().with_fault_log(fault_log)
|
self.step(fault_log)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Receives input from a remote node.
|
/// Receives input from a remote node.
|
||||||
|
@ -114,7 +114,7 @@ where
|
||||||
} else {
|
} else {
|
||||||
FaultLog::default()
|
FaultLog::default()
|
||||||
};
|
};
|
||||||
self.step().with_fault_log(fault_log)
|
self.step(fault_log)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Takes the next share of a threshold signature message for multicasting to all other nodes.
|
/// Takes the next share of a threshold signature message for multicasting to all other nodes.
|
||||||
|
@ -151,8 +151,11 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn step(&mut self) -> Result<CommonCoinStep<NodeUid>> {
|
fn step(&mut self, fault_log: FaultLog<NodeUid>) -> Result<CommonCoinStep<NodeUid>> {
|
||||||
Ok(Step::new(self.output.take()))
|
Ok(Step::new(
|
||||||
|
self.output.take().into_iter().collect(),
|
||||||
|
fault_log,
|
||||||
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_coin(&mut self) -> Result<FaultLog<NodeUid>> {
|
fn get_coin(&mut self) -> Result<FaultLog<NodeUid>> {
|
||||||
|
|
|
@ -25,7 +25,6 @@
|
||||||
|
|
||||||
use std::collections::{BTreeMap, BTreeSet, VecDeque};
|
use std::collections::{BTreeMap, BTreeSet, VecDeque};
|
||||||
use std::fmt::Debug;
|
use std::fmt::Debug;
|
||||||
use std::mem::replace;
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use agreement::{self, Agreement, AgreementMessage, AgreementStep};
|
use agreement::{self, Agreement, AgreementMessage, AgreementStep};
|
||||||
|
@ -118,7 +117,7 @@ impl<NodeUid: Clone + Debug + Ord> DistAlgorithm for CommonSubset<NodeUid> {
|
||||||
HexBytes(&input)
|
HexBytes(&input)
|
||||||
);
|
);
|
||||||
let fault_log = self.send_proposed_value(input)?;
|
let fault_log = self.send_proposed_value(input)?;
|
||||||
self.step().with_fault_log(fault_log)
|
self.step(fault_log)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_message(
|
fn handle_message(
|
||||||
|
@ -130,7 +129,7 @@ impl<NodeUid: Clone + Debug + Ord> DistAlgorithm for CommonSubset<NodeUid> {
|
||||||
Message::Broadcast(p_id, b_msg) => self.handle_broadcast(sender_id, &p_id, b_msg)?,
|
Message::Broadcast(p_id, b_msg) => self.handle_broadcast(sender_id, &p_id, b_msg)?,
|
||||||
Message::Agreement(p_id, a_msg) => self.handle_agreement(sender_id, &p_id, a_msg)?,
|
Message::Agreement(p_id, a_msg) => self.handle_agreement(sender_id, &p_id, a_msg)?,
|
||||||
};
|
};
|
||||||
self.step().with_fault_log(fault_log)
|
self.step(fault_log)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn next_message(&mut self) -> Option<TargetedMessage<Self::Message, Self::NodeUid>> {
|
fn next_message(&mut self) -> Option<TargetedMessage<Self::Message, Self::NodeUid>> {
|
||||||
|
@ -178,8 +177,14 @@ impl<NodeUid: Clone + Debug + Ord> CommonSubset<NodeUid> {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn step(&mut self) -> CommonSubsetResult<CommonSubsetStep<NodeUid>> {
|
fn step(
|
||||||
Ok(Step::new(replace(&mut self.output, None)))
|
&mut self,
|
||||||
|
fault_log: FaultLog<NodeUid>,
|
||||||
|
) -> CommonSubsetResult<CommonSubsetStep<NodeUid>> {
|
||||||
|
Ok(Step::new(
|
||||||
|
self.output.take().into_iter().collect(),
|
||||||
|
fault_log,
|
||||||
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Common Subset input message handler. It receives a value for broadcast
|
/// Common Subset input message handler. It receives a value for broadcast
|
||||||
|
@ -240,7 +245,7 @@ impl<NodeUid: Clone + Debug + Ord> CommonSubset<NodeUid> {
|
||||||
let step = f(broadcast)?;
|
let step = f(broadcast)?;
|
||||||
fault_log.extend(step.fault_log);
|
fault_log.extend(step.fault_log);
|
||||||
self.messages.extend_broadcast(&proposer_id, broadcast);
|
self.messages.extend_broadcast(&proposer_id, broadcast);
|
||||||
if let Some(output) = step.output {
|
if let Some(output) = step.output.into_iter().next() {
|
||||||
output
|
output
|
||||||
} else {
|
} else {
|
||||||
return Ok(fault_log);
|
return Ok(fault_log);
|
||||||
|
@ -282,7 +287,7 @@ impl<NodeUid: Clone + Debug + Ord> CommonSubset<NodeUid> {
|
||||||
let step = f(agreement)?;
|
let step = f(agreement)?;
|
||||||
fault_log.extend(step.fault_log);
|
fault_log.extend(step.fault_log);
|
||||||
self.messages.extend_agreement(proposer_id, agreement);
|
self.messages.extend_agreement(proposer_id, agreement);
|
||||||
if let Some(output) = step.output {
|
if let Some(output) = step.output.into_iter().next() {
|
||||||
output
|
output
|
||||||
} else {
|
} else {
|
||||||
return Ok(fault_log);
|
return Ok(fault_log);
|
||||||
|
@ -306,10 +311,10 @@ impl<NodeUid: Clone + Debug + Ord> CommonSubset<NodeUid> {
|
||||||
// input 0 to each instance of BA that has not yet been provided input.
|
// input 0 to each instance of BA that has not yet been provided input.
|
||||||
for (uid, agreement) in &mut self.agreement_instances {
|
for (uid, agreement) in &mut self.agreement_instances {
|
||||||
if agreement.accepts_input() {
|
if agreement.accepts_input() {
|
||||||
let step = agreement.set_input(false)?;
|
let step = agreement.input(false)?;
|
||||||
fault_log.extend(step.fault_log);
|
fault_log.extend(step.fault_log);
|
||||||
self.messages.extend_agreement(uid, agreement);
|
self.messages.extend_agreement(uid, agreement);
|
||||||
if let Some(output) = step.output {
|
if let Some(output) = step.output.into_iter().next() {
|
||||||
if self.agreement_results.insert(uid.clone(), output).is_some() {
|
if self.agreement_results.insert(uid.clone(), output).is_some() {
|
||||||
return Err(ErrorKind::MultipleAgreementResults.into());
|
return Err(ErrorKind::MultipleAgreementResults.into());
|
||||||
}
|
}
|
||||||
|
|
|
@ -131,7 +131,7 @@ where
|
||||||
Input::User(contrib) => self.propose(contrib)?,
|
Input::User(contrib) => self.propose(contrib)?,
|
||||||
Input::Change(change) => self.vote_for(change).map(|()| FaultLog::new())?,
|
Input::Change(change) => self.vote_for(change).map(|()| FaultLog::new())?,
|
||||||
};
|
};
|
||||||
self.step().with_fault_log(fault_log)
|
self.step(fault_log)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_message(
|
fn handle_message(
|
||||||
|
@ -161,7 +161,7 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
self.step().with_fault_log(fault_log)
|
self.step(fault_log)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn next_message(&mut self) -> Option<TargetedMessage<Self::Message, NodeUid>> {
|
fn next_message(&mut self) -> Option<TargetedMessage<Self::Message, NodeUid>> {
|
||||||
|
@ -182,8 +182,8 @@ where
|
||||||
C: Eq + Serialize + for<'r> Deserialize<'r> + Debug + Hash,
|
C: Eq + Serialize + for<'r> Deserialize<'r> + Debug + Hash,
|
||||||
NodeUid: Eq + Ord + Clone + Debug + Serialize + for<'r> Deserialize<'r> + Hash,
|
NodeUid: Eq + Ord + Clone + Debug + Serialize + for<'r> Deserialize<'r> + Hash,
|
||||||
{
|
{
|
||||||
fn step(&mut self) -> Result<DynamicHoneyBadgerStep<C, NodeUid>> {
|
fn step(&mut self, fault_log: FaultLog<NodeUid>) -> Result<DynamicHoneyBadgerStep<C, NodeUid>> {
|
||||||
Ok(Step::new(self.output.drain(0..).collect()))
|
Ok(Step::new(self.output.drain(0..).collect(), fault_log))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns a new `DynamicHoneyBadgerBuilder` configured to use the node IDs and cryptographic
|
/// Returns a new `DynamicHoneyBadgerBuilder` configured to use the node IDs and cryptographic
|
||||||
|
@ -199,7 +199,6 @@ where
|
||||||
|
|
||||||
/// Proposes a contribution in the current epoch.
|
/// Proposes a contribution in the current epoch.
|
||||||
pub fn propose(&mut self, contrib: C) -> Result<FaultLog<NodeUid>> {
|
pub fn propose(&mut self, contrib: C) -> Result<FaultLog<NodeUid>> {
|
||||||
let mut fault_log = FaultLog::new();
|
|
||||||
let step = self.honey_badger.input(InternalContrib {
|
let step = self.honey_badger.input(InternalContrib {
|
||||||
contrib,
|
contrib,
|
||||||
key_gen_messages: self.key_gen_msg_buffer.clone(),
|
key_gen_messages: self.key_gen_msg_buffer.clone(),
|
||||||
|
@ -252,7 +251,7 @@ where
|
||||||
self.key_gen_msg_buffer.push(tx);
|
self.key_gen_msg_buffer.push(tx);
|
||||||
// FIXME: Remove the call to `process_output`. There wasn't any output from HB in this
|
// FIXME: Remove the call to `process_output`. There wasn't any output from HB in this
|
||||||
// function.
|
// function.
|
||||||
self.process_output()
|
self.process_output(Default::default())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Processes all pending batches output by Honey Badger.
|
/// Processes all pending batches output by Honey Badger.
|
||||||
|
@ -263,7 +262,7 @@ where
|
||||||
let mut fault_log = FaultLog::new();
|
let mut fault_log = FaultLog::new();
|
||||||
fault_log.extend(step.fault_log);
|
fault_log.extend(step.fault_log);
|
||||||
let start_epoch = self.start_epoch;
|
let start_epoch = self.start_epoch;
|
||||||
while let Some(hb_batch) = step.output.iter().next() {
|
for hb_batch in step.output {
|
||||||
// Create the batch we output ourselves. It will contain the _user_ transactions of
|
// Create the batch we output ourselves. It will contain the _user_ transactions of
|
||||||
// `hb_batch`, and the current change state.
|
// `hb_batch`, and the current change state.
|
||||||
let mut batch = Batch::new(hb_batch.epoch + self.start_epoch);
|
let mut batch = Batch::new(hb_batch.epoch + self.start_epoch);
|
||||||
|
|
|
@ -3,7 +3,6 @@ use std::collections::{BTreeMap, BTreeSet, VecDeque};
|
||||||
use std::fmt::Debug;
|
use std::fmt::Debug;
|
||||||
use std::hash::Hash;
|
use std::hash::Hash;
|
||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
use std::mem::replace;
|
|
||||||
use std::ops::Not;
|
use std::ops::Not;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
@ -73,7 +72,7 @@ where
|
||||||
common_subsets: BTreeMap::new(),
|
common_subsets: BTreeMap::new(),
|
||||||
max_future_epochs: self.max_future_epochs as u64,
|
max_future_epochs: self.max_future_epochs as u64,
|
||||||
messages: MessageQueue(VecDeque::new()),
|
messages: MessageQueue(VecDeque::new()),
|
||||||
output: None,
|
output: Vec::new(),
|
||||||
incoming_queue: BTreeMap::new(),
|
incoming_queue: BTreeMap::new(),
|
||||||
received_shares: BTreeMap::new(),
|
received_shares: BTreeMap::new(),
|
||||||
decrypted_contributions: BTreeMap::new(),
|
decrypted_contributions: BTreeMap::new(),
|
||||||
|
@ -97,8 +96,8 @@ pub struct HoneyBadger<C, NodeUid> {
|
||||||
max_future_epochs: u64,
|
max_future_epochs: u64,
|
||||||
/// The messages that need to be sent to other nodes.
|
/// The messages that need to be sent to other nodes.
|
||||||
messages: MessageQueue<NodeUid>,
|
messages: MessageQueue<NodeUid>,
|
||||||
/// The output from a completed epoch.
|
/// The outputs from completed epochs.
|
||||||
output: Option<Batch<C, NodeUid>>,
|
pub(crate) output: Vec<Batch<C, NodeUid>>,
|
||||||
/// Messages for future epochs that couldn't be handled yet.
|
/// Messages for future epochs that couldn't be handled yet.
|
||||||
incoming_queue: BTreeMap<u64, Vec<(NodeUid, MessageContent<NodeUid>)>>,
|
incoming_queue: BTreeMap<u64, Vec<(NodeUid, MessageContent<NodeUid>)>>,
|
||||||
/// Received decryption shares for an epoch. Each decryption share has a sender and a
|
/// Received decryption shares for an epoch. Each decryption share has a sender and a
|
||||||
|
@ -126,7 +125,7 @@ where
|
||||||
|
|
||||||
fn input(&mut self, input: Self::Input) -> HoneyBadgerResult<HoneyBadgerStep<C, NodeUid>> {
|
fn input(&mut self, input: Self::Input) -> HoneyBadgerResult<HoneyBadgerStep<C, NodeUid>> {
|
||||||
let fault_log = self.propose(&input)?;
|
let fault_log = self.propose(&input)?;
|
||||||
self.step().with_fault_log(fault_log)
|
self.step(fault_log)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_message(
|
fn handle_message(
|
||||||
|
@ -148,7 +147,7 @@ where
|
||||||
} else if epoch == self.epoch {
|
} else if epoch == self.epoch {
|
||||||
fault_log.extend(self.handle_message_content(sender_id, epoch, content)?);
|
fault_log.extend(self.handle_message_content(sender_id, epoch, content)?);
|
||||||
} // And ignore all messages from past epochs.
|
} // And ignore all messages from past epochs.
|
||||||
self.step().with_fault_log(fault_log)
|
self.step(fault_log)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn next_message(&mut self) -> Option<TargetedMessage<Self::Message, NodeUid>> {
|
fn next_message(&mut self) -> Option<TargetedMessage<Self::Message, NodeUid>> {
|
||||||
|
@ -175,8 +174,11 @@ where
|
||||||
HoneyBadgerBuilder::new(netinfo)
|
HoneyBadgerBuilder::new(netinfo)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn step(&mut self) -> HoneyBadgerResult<HoneyBadgerStep<C, NodeUid>> {
|
fn step(
|
||||||
Ok(Step::new(replace(&mut self.output, Default::default())))
|
&mut self,
|
||||||
|
fault_log: FaultLog<NodeUid>,
|
||||||
|
) -> HoneyBadgerResult<HoneyBadgerStep<C, NodeUid>> {
|
||||||
|
Ok(Step::new(self.output.drain(0..).collect(), fault_log))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Proposes a new item in the current epoch.
|
/// Proposes a new item in the current epoch.
|
||||||
|
@ -192,10 +194,11 @@ where
|
||||||
};
|
};
|
||||||
let ser_prop = bincode::serialize(&proposal)?;
|
let ser_prop = bincode::serialize(&proposal)?;
|
||||||
let ciphertext = self.netinfo.public_key_set().public_key().encrypt(ser_prop);
|
let ciphertext = self.netinfo.public_key_set().public_key().encrypt(ser_prop);
|
||||||
let fault_log = cs.input(bincode::serialize(&ciphertext).unwrap())?;
|
let step = cs.input(bincode::serialize(&ciphertext).unwrap())?;
|
||||||
|
// FIXME: Use `step.output`.
|
||||||
self.has_input = true;
|
self.has_input = true;
|
||||||
self.messages.extend_with_epoch(self.epoch, cs);
|
self.messages.extend_with_epoch(self.epoch, cs);
|
||||||
Ok(fault_log)
|
Ok(step.fault_log)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns `true` if input for the current epoch has already been provided.
|
/// Returns `true` if input for the current epoch has already been provided.
|
||||||
|
@ -242,10 +245,9 @@ where
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
// Handle the message and put the outgoing messages into the queue.
|
// Handle the message and put the outgoing messages into the queue.
|
||||||
let step = cs.handle_message(sender_id, message)?;
|
let cs_step = cs.handle_message(sender_id, message)?;
|
||||||
fault_log.extend(step.fault_log);
|
|
||||||
self.messages.extend_with_epoch(epoch, cs);
|
self.messages.extend_with_epoch(epoch, cs);
|
||||||
step
|
cs_step
|
||||||
};
|
};
|
||||||
// If this is the current epoch, the message could cause a new output.
|
// If this is the current epoch, the message could cause a new output.
|
||||||
if epoch == self.epoch {
|
if epoch == self.epoch {
|
||||||
|
@ -347,7 +349,7 @@ where
|
||||||
batch.contributions
|
batch.contributions
|
||||||
);
|
);
|
||||||
// Queue the output and advance the epoch.
|
// Queue the output and advance the epoch.
|
||||||
self.output = Some(batch);
|
self.output.push(batch);
|
||||||
fault_log.extend(self.update_epoch()?);
|
fault_log.extend(self.update_epoch()?);
|
||||||
Ok(fault_log)
|
Ok(fault_log)
|
||||||
}
|
}
|
||||||
|
@ -552,7 +554,8 @@ where
|
||||||
step: CommonSubsetStep<NodeUid>,
|
step: CommonSubsetStep<NodeUid>,
|
||||||
) -> HoneyBadgerResult<FaultLog<NodeUid>> {
|
) -> HoneyBadgerResult<FaultLog<NodeUid>> {
|
||||||
let mut fault_log = FaultLog::new();
|
let mut fault_log = FaultLog::new();
|
||||||
if let Some(cs_output) = step.output {
|
fault_log.extend(step.fault_log);
|
||||||
|
for cs_output in step.output {
|
||||||
fault_log.extend(self.send_decryption_shares(cs_output)?);
|
fault_log.extend(self.send_decryption_shares(cs_output)?);
|
||||||
// TODO: May also check that there is no further output from Common Subset.
|
// TODO: May also check that there is no further output from Common Subset.
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
use std::collections::{BTreeMap, BTreeSet};
|
use std::collections::{BTreeMap, BTreeSet, VecDeque};
|
||||||
use std::fmt::Debug;
|
use std::fmt::Debug;
|
||||||
|
|
||||||
use clear_on_drop::ClearOnDrop;
|
use clear_on_drop::ClearOnDrop;
|
||||||
|
@ -57,7 +57,7 @@ pub struct Step<N, O>
|
||||||
where
|
where
|
||||||
N: Clone,
|
N: Clone,
|
||||||
{
|
{
|
||||||
pub output: Option<O>,
|
pub output: VecDeque<O>,
|
||||||
pub fault_log: FaultLog<N>,
|
pub fault_log: FaultLog<N>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -67,7 +67,7 @@ where
|
||||||
{
|
{
|
||||||
fn default() -> Step<N, O> {
|
fn default() -> Step<N, O> {
|
||||||
Step {
|
Step {
|
||||||
output: None,
|
output: Default::default(),
|
||||||
fault_log: FaultLog::default(),
|
fault_log: FaultLog::default(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -77,17 +77,12 @@ impl<N, O> Step<N, O>
|
||||||
where
|
where
|
||||||
N: Clone,
|
N: Clone,
|
||||||
{
|
{
|
||||||
pub fn new(output: Option<O>) -> Self {
|
pub fn new(output: VecDeque<O>, fault_log: FaultLog<N>) -> Self {
|
||||||
Step {
|
Step {
|
||||||
output,
|
output,
|
||||||
fault_log: FaultLog::default(),
|
fault_log,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn with_fault_log(&mut self, fault_log: FaultLog<N>) -> &mut Self {
|
|
||||||
self.fault_log = fault_log;
|
|
||||||
self
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A distributed algorithm that defines a message flow.
|
/// A distributed algorithm that defines a message flow.
|
||||||
|
|
|
@ -148,7 +148,7 @@ where
|
||||||
step.fault_log
|
step.fault_log
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
self.step().with_fault_log(fault_log)
|
self.step(fault_log)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_message(
|
fn handle_message(
|
||||||
|
@ -157,16 +157,16 @@ where
|
||||||
message: Self::Message,
|
message: Self::Message,
|
||||||
) -> Result<QueueingHoneyBadgerStep<Tx, NodeUid>> {
|
) -> Result<QueueingHoneyBadgerStep<Tx, NodeUid>> {
|
||||||
let step = self.dyn_hb.handle_message(sender_id, message)?;
|
let step = self.dyn_hb.handle_message(sender_id, message)?;
|
||||||
let fault_log = FaultLog::new();
|
let mut fault_log = FaultLog::new();
|
||||||
fault_log.extend(step.fault_log);
|
fault_log.extend(step.fault_log);
|
||||||
if let Some(batch) = step.output {
|
for batch in step.output {
|
||||||
self.queue.remove_all(batch.iter());
|
self.queue.remove_all(batch.iter());
|
||||||
self.output.push_back(batch);
|
self.output.push_back(batch);
|
||||||
}
|
}
|
||||||
if !self.dyn_hb.has_input() {
|
if !self.dyn_hb.has_input() {
|
||||||
fault_log.extend(self.propose()?);
|
fault_log.extend(self.propose()?);
|
||||||
}
|
}
|
||||||
self.step().with_fault_log(fault_log)
|
self.step(fault_log)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn next_message(&mut self) -> Option<TargetedMessage<Self::Message, NodeUid>> {
|
fn next_message(&mut self) -> Option<TargetedMessage<Self::Message, NodeUid>> {
|
||||||
|
@ -193,8 +193,11 @@ where
|
||||||
QueueingHoneyBadgerBuilder::new(netinfo)
|
QueueingHoneyBadgerBuilder::new(netinfo)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn step(&mut self) -> Result<QueueingHoneyBadgerStep<Tx, NodeUid>> {
|
fn step(
|
||||||
Ok(Step::new(self.output.take()))
|
&mut self,
|
||||||
|
fault_log: FaultLog<NodeUid>,
|
||||||
|
) -> Result<QueueingHoneyBadgerStep<Tx, NodeUid>> {
|
||||||
|
Ok(Step::new(self.output.drain(0..).collect(), fault_log))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Initiates the next epoch by proposing a batch from the queue.
|
/// Initiates the next epoch by proposing a batch from the queue.
|
||||||
|
|
|
@ -66,6 +66,10 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
debug!(
|
||||||
|
"{:?} min_missing {}, num_txs {}",
|
||||||
|
node.id, min_missing, num_txs
|
||||||
|
);
|
||||||
if min_missing < num_txs {
|
if min_missing < num_txs {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,7 +15,7 @@ pub struct NodeUid(pub usize);
|
||||||
/// A "node" running an instance of the algorithm `D`.
|
/// A "node" running an instance of the algorithm `D`.
|
||||||
pub struct TestNode<D: DistAlgorithm> {
|
pub struct TestNode<D: DistAlgorithm> {
|
||||||
/// This node's own ID.
|
/// This node's own ID.
|
||||||
id: D::NodeUid,
|
pub id: D::NodeUid,
|
||||||
/// The instance of the broadcast algorithm.
|
/// The instance of the broadcast algorithm.
|
||||||
algo: D,
|
algo: D,
|
||||||
/// Incoming messages from other nodes that this node has not yet handled.
|
/// Incoming messages from other nodes that this node has not yet handled.
|
||||||
|
|
Loading…
Reference in New Issue