Allow observer nodes in all algorithms.

This allows nodes to join the network without sending any messages
themselves. They can't give any input and just observe the outcome.

Closes #81
This commit is contained in:
Andreas Fackler 2018-06-25 21:09:45 +02:00
parent b3b3994ec1
commit 2a5f9f1bfe
11 changed files with 118 additions and 66 deletions

View File

@ -227,12 +227,14 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
self.decision = Some(input);
self.output = Some(input);
self.terminated = true;
self.send_bval(input)?;
self.send_aux(input)
} else {
// Set the initial estimated value to the input value.
self.estimated = Some(input);
// Record the input value as sent.
self.send_bval(input)
}
// Set the initial estimated value to the input value.
self.estimated = Some(input);
// Record the input value as sent.
self.send_bval(input)
}
/// Acceptance check to be performed before setting the input value.
@ -305,6 +307,9 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
}
fn send_bval(&mut self, b: bool) -> AgreementResult<()> {
if !self.netinfo.is_full_node() {
return Ok(());
}
// Record the value `b` as sent.
self.sent_bval.insert(b);
// Multicast `BVal`.
@ -321,12 +326,17 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
return Ok(());
}
// Trigger the start of the `Conf` round.
self.conf_round = true;
if !self.netinfo.is_full_node() {
return Ok(());
}
let v = self.bin_values;
// Multicast `Conf`.
self.messages
.push_back(AgreementContent::Conf(v).with_epoch(self.epoch));
// Trigger the start of the `Conf` round.
self.conf_round = true;
// Receive the `Conf` message locally.
let our_uid = &self.netinfo.our_uid().clone();
self.handle_conf(our_uid, v)

View File

@ -293,10 +293,12 @@ impl<NodeUid: Debug + Clone + Ord> Broadcast<NodeUid> {
// Otherwise multicast the proof in an `Echo` message, and handle it ourselves.
self.echo_sent = true;
let our_uid = &self.netinfo.our_uid().clone();
self.handle_echo(our_uid, p.clone())?;
let echo_msg = Target::All.message(BroadcastMessage::Echo(p));
self.messages.push_back(echo_msg);
if self.netinfo.is_full_node() {
let our_uid = &self.netinfo.our_uid().clone();
self.handle_echo(our_uid, p.clone())?;
let echo_msg = Target::All.message(BroadcastMessage::Echo(p));
self.messages.push_back(echo_msg);
}
Ok(())
}
@ -328,10 +330,13 @@ impl<NodeUid: Debug + Clone + Ord> Broadcast<NodeUid> {
// Upon receiving `N - f` `Echo`s with this root hash, multicast `Ready`.
self.ready_sent = true;
let ready_msg = Target::All.message(BroadcastMessage::Ready(hash.clone()));
self.messages.push_back(ready_msg);
let our_uid = &self.netinfo.our_uid().clone();
self.handle_ready(our_uid, &hash)
if self.netinfo.is_full_node() {
let ready_msg = Target::All.message(BroadcastMessage::Ready(hash.clone()));
self.messages.push_back(ready_msg);
let our_uid = &self.netinfo.our_uid().clone();
self.handle_ready(our_uid, &hash)?;
}
Ok(())
}
/// Handles a received `Ready` message.
@ -353,8 +358,10 @@ impl<NodeUid: Debug + Clone + Ord> Broadcast<NodeUid> {
if self.count_readys(hash) == self.netinfo.num_faulty() + 1 && !self.ready_sent {
// Enqueue a broadcast of a Ready message.
self.ready_sent = true;
let ready_msg = Target::All.message(BroadcastMessage::Ready(hash.to_vec()));
self.messages.push_back(ready_msg);
if self.netinfo.is_full_node() {
let ready_msg = Target::All.message(BroadcastMessage::Ready(hash.to_vec()));
self.messages.push_back(ready_msg);
}
}
self.compute_output(&hash)
}

View File

@ -126,6 +126,9 @@ where
}
fn get_coin(&mut self) -> Result<()> {
if !self.netinfo.is_full_node() {
return self.try_output();
}
let share = self.netinfo.secret_key().sign(&self.nonce);
self.messages.push_back(CommonCoinMessage(share.clone()));
let id = self.netinfo.our_uid().clone();
@ -140,18 +143,22 @@ where
return Ok(());
}
self.received_shares.insert(sender_id.clone(), share);
let received_shares = &self.received_shares;
if self.had_input && received_shares.len() > self.netinfo.num_faulty() {
let sig = self.combine_and_verify_sig()?;
// Output the parity of the verified signature.
let parity = sig.parity();
self.output = Some(parity);
self.terminated = true;
}
Ok(())
} else {
Err(ErrorKind::UnknownSender.into())
return Err(ErrorKind::UnknownSender.into());
}
self.try_output()
}
fn try_output(&mut self) -> Result<()> {
let received_shares = &self.received_shares;
if self.had_input && received_shares.len() > self.netinfo.num_faulty() {
let sig = self.combine_and_verify_sig()?;
// Output the parity of the verified signature.
let parity = sig.parity();
self.output = Some(parity);
self.terminated = true;
}
Ok(())
}
fn combine_and_verify_sig(&self) -> Result<Signature> {

View File

@ -180,6 +180,9 @@ impl<NodeUid: Clone + Debug + Ord> CommonSubset<NodeUid> {
/// Common Subset input message handler. It receives a value for broadcast
/// and redirects it to the corresponding broadcast instance.
pub fn send_proposed_value(&mut self, value: ProposedValue) -> CommonSubsetResult<()> {
if !self.netinfo.is_full_node() {
return Ok(());
}
let uid = self.netinfo.our_uid().clone();
// Upon receiving input v_i , input v_i to RBC_i. See Figure 2.
self.process_broadcast(&uid, |bc| bc.input(value))

View File

@ -28,6 +28,7 @@ error_chain!{
errors {
UnknownSender
ObserverCannotPropose
}
}
@ -151,8 +152,12 @@ where
&mut self,
txs: I,
) -> HoneyBadgerResult<()> {
self.buffer.extend(txs);
Ok(())
if self.netinfo.is_full_node() {
self.buffer.extend(txs);
Ok(())
} else {
Err(ErrorKind::ObserverCannotPropose.into())
}
}
/// Empties and returns the transaction buffer.
@ -162,6 +167,9 @@ where
/// Proposes a new batch in the current epoch.
fn propose(&mut self) -> HoneyBadgerResult<()> {
if !self.netinfo.is_full_node() {
return Ok(());
}
let proposal = self.choose_transactions()?;
let cs = match self.common_subsets.entry(self.epoch) {
Entry::Occupied(entry) => entry.into_mut(),
@ -420,24 +428,31 @@ where
self.verify_pending_decryption_shares(&proposer_id, &ciphertext);
self.remove_incorrect_decryption_shares(&proposer_id, incorrect_senders);
if let Some(share) = self.netinfo.secret_key().decrypt_share(&ciphertext) {
// Send the share to remote nodes.
self.messages.0.push_back(
Target::All.message(
MessageContent::DecryptionShare {
proposer_id: proposer_id.clone(),
share: share.clone(),
}.with_epoch(self.epoch),
),
);
let our_id = self.netinfo.our_uid().clone();
let epoch = self.epoch;
// Receive the share locally.
self.handle_decryption_share_message(&our_id, epoch, proposer_id.clone(), share)?;
} else {
warn!("Share decryption failed for proposer {:?}", proposer_id);
// TODO: Log the decryption failure.
continue;
if self.netinfo.is_full_node() {
if let Some(share) = self.netinfo.secret_key().decrypt_share(&ciphertext) {
// Send the share to remote nodes.
self.messages.0.push_back(
Target::All.message(
MessageContent::DecryptionShare {
proposer_id: proposer_id.clone(),
share: share.clone(),
}.with_epoch(self.epoch),
),
);
let our_id = self.netinfo.our_uid().clone();
let epoch = self.epoch;
// Receive the share locally.
self.handle_decryption_share_message(
&our_id,
epoch,
proposer_id.clone(),
share,
)?;
} else {
warn!("Share decryption failed for proposer {:?}", proposer_id);
// TODO: Log the decryption failure.
continue;
}
}
let ciphertexts = self
.ciphertexts

View File

@ -154,9 +154,6 @@ impl<NodeUid: Clone + Ord> NetworkInfo<NodeUid> {
secret_key: ClearOnDrop<Box<SecretKey>>,
public_key_set: PublicKeySet,
) -> Self {
if !all_uids.contains(&our_uid) {
panic!("Missing own ID");
}
let num_nodes = all_uids.len();
let node_indices = all_uids
.iter()
@ -218,4 +215,10 @@ impl<NodeUid: Clone + Ord> NetworkInfo<NodeUid> {
pub fn invocation_id(&self) -> Vec<u8> {
self.public_key_set.public_key().to_bytes()
}
/// Returns `true` if this node takes part in the consensus itself. If not, it is only an
/// observer.
pub fn is_full_node(&self) -> bool {
self.all_uids.contains(&self.our_uid)
}
}

View File

@ -57,6 +57,7 @@ fn test_agreement<A: Adversary<Agreement<NodeUid>>>(
expected = Some(node.outputs()[0]);
}
}
assert!(expected.iter().eq(network.observer.outputs()));
}
fn test_agreement_different_sizes<A, F>(new_adversary: F)

View File

@ -107,6 +107,7 @@ fn test_broadcast<A: Adversary<Broadcast<NodeUid>>>(
for node in network.nodes.values() {
assert!(once(&proposed_value.to_vec()).eq(node.outputs()));
}
assert!(once(&proposed_value.to_vec()).eq(network.observer.outputs()));
}
fn new_broadcast(netinfo: Rc<NetworkInfo<NodeUid>>) -> Broadcast<NodeUid> {

View File

@ -21,23 +21,18 @@ use network::{Adversary, MessageScheduler, NodeUid, SilentAdversary, TestNetwork
/// Tests a network of Common Coin instances with an optional expected value. Outputs the computed
/// common coin value if the test is successful.
fn test_common_coin<A>(
mut network: TestNetwork<A, CommonCoin<NodeUid, String>>,
expected_coin: Option<bool>,
) -> bool
fn test_common_coin<A>(mut network: TestNetwork<A, CommonCoin<NodeUid, String>>) -> bool
where
A: Adversary<CommonCoin<NodeUid, String>>,
{
let ids: Vec<NodeUid> = network.nodes.keys().cloned().collect();
for id in ids {
network.input(id, ());
}
network.input_all(());
network.observer.input(()); // Observer will only return after `input` was called.
// Handle messages until all good nodes have terminated.
while !network.nodes.values().all(TestNode::terminated) {
network.step();
}
let mut expected = expected_coin;
let mut expected = None;
// Verify that all instances output the same value.
for node in network.nodes.values() {
if let Some(b) = expected {
@ -48,6 +43,7 @@ where
}
}
// Now `expected` is the unique output of all good nodes.
assert!(expected.iter().eq(network.observer.outputs()));
expected.unwrap()
}
@ -109,7 +105,7 @@ where
let new_common_coin = |netinfo: _| CommonCoin::new(netinfo, nonce.clone());
let network =
TestNetwork::new(num_good_nodes, num_faulty_nodes, adversary, new_common_coin);
let coin = test_common_coin(network, None);
let coin = test_common_coin(network);
if coin {
count_true += 1;
} else {

View File

@ -49,6 +49,7 @@ fn test_common_subset<A: Adversary<CommonSubset<NodeUid>>>(
expected = Some(node.outputs()[0].clone());
}
let output = expected.unwrap();
assert!(once(&output).eq(network.observer.outputs()));
// The Common Subset algorithm guarantees that more than two thirds of the proposed elements
// are in the set.
assert!(output.len() * 3 > inputs.len() * 2);

View File

@ -36,6 +36,12 @@ impl<D: DistAlgorithm> TestNode<D> {
self.algo.terminated()
}
/// Inputs a value into the instance.
pub fn input(&mut self, input: D::Input) {
self.algo.input(input).expect("input");
self.outputs.extend(self.algo.output_iter());
}
/// Creates a new test node with the given broadcast instance.
fn new(mut algo: D) -> TestNode<D> {
let outputs = algo.output_iter().collect();
@ -56,12 +62,6 @@ impl<D: DistAlgorithm> TestNode<D> {
.expect("handling message");
self.outputs.extend(self.algo.output_iter());
}
/// Inputs a value into the instance.
fn input(&mut self, input: D::Input) {
self.algo.input(input).expect("input");
self.outputs.extend(self.algo.output_iter());
}
}
/// A strategy for picking the next good node to handle a message.
@ -148,6 +148,7 @@ where
<D as DistAlgorithm>::NodeUid: Hash,
{
pub nodes: BTreeMap<D::NodeUid, TestNode<D>>,
pub observer: TestNode<D>,
pub adv_nodes: BTreeMap<D::NodeUid, Rc<NetworkInfo<D::NodeUid>>>,
adversary: A,
}
@ -202,6 +203,7 @@ where
.collect();
let mut network = TestNetwork {
nodes: (0..good_num).map(NodeUid).map(new_node_by_id).collect(),
observer: new_node_by_id(NodeUid(good_num + adv_num)).1,
adversary: adversary(adv_nodes.clone()),
adv_nodes,
};
@ -232,6 +234,9 @@ where
node.queue.push_back((sender_id, msg.message.clone()))
}
}
self.observer
.queue
.push_back((sender_id, msg.message.clone()));
self.adversary.push_message(sender_id, msg);
}
Target::Node(to_id) => {
@ -247,6 +252,9 @@ where
}
}
}
while !self.observer.queue.is_empty() {
self.observer.handle_message();
}
}
/// Handles a queued message in a randomly selected node and returns the selected node's ID.