Merge pull request #83 from poanetwork/afck-observer

Allow observer nodes in all algorithms.
This commit is contained in:
Andreas Fackler 2018-06-26 15:12:46 +02:00 committed by GitHub
commit 78fdf63540
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 135 additions and 68 deletions

View File

@ -227,13 +227,15 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
self.decision = Some(input); self.decision = Some(input);
self.output = Some(input); self.output = Some(input);
self.terminated = true; self.terminated = true;
} self.send_bval(input)?;
self.send_aux(input)
} else {
// Set the initial estimated value to the input value. // Set the initial estimated value to the input value.
self.estimated = Some(input); self.estimated = Some(input);
// Record the input value as sent. // Record the input value as sent.
self.send_bval(input) self.send_bval(input)
} }
}
/// Acceptance check to be performed before setting the input value. /// Acceptance check to be performed before setting the input value.
pub fn accepts_input(&self) -> bool { pub fn accepts_input(&self) -> bool {
@ -305,6 +307,9 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
} }
fn send_bval(&mut self, b: bool) -> AgreementResult<()> { fn send_bval(&mut self, b: bool) -> AgreementResult<()> {
if !self.netinfo.is_peer() {
return Ok(());
}
// Record the value `b` as sent. // Record the value `b` as sent.
self.sent_bval.insert(b); self.sent_bval.insert(b);
// Multicast `BVal`. // Multicast `BVal`.
@ -321,12 +326,17 @@ impl<NodeUid: Clone + Debug + Ord> Agreement<NodeUid> {
return Ok(()); return Ok(());
} }
// Trigger the start of the `Conf` round.
self.conf_round = true;
if !self.netinfo.is_peer() {
return Ok(());
}
let v = self.bin_values; let v = self.bin_values;
// Multicast `Conf`. // Multicast `Conf`.
self.messages self.messages
.push_back(AgreementContent::Conf(v).with_epoch(self.epoch)); .push_back(AgreementContent::Conf(v).with_epoch(self.epoch));
// Trigger the start of the `Conf` round.
self.conf_round = true;
// Receive the `Conf` message locally. // Receive the `Conf` message locally.
let our_uid = &self.netinfo.our_uid().clone(); let our_uid = &self.netinfo.our_uid().clone();
self.handle_conf(our_uid, v) self.handle_conf(our_uid, v)

View File

@ -292,12 +292,7 @@ impl<NodeUid: Debug + Clone + Ord> Broadcast<NodeUid> {
} }
// Otherwise multicast the proof in an `Echo` message, and handle it ourselves. // Otherwise multicast the proof in an `Echo` message, and handle it ourselves.
self.echo_sent = true; self.send_echo(p)
let our_uid = &self.netinfo.our_uid().clone();
self.handle_echo(our_uid, p.clone())?;
let echo_msg = Target::All.message(BroadcastMessage::Echo(p));
self.messages.push_back(echo_msg);
Ok(())
} }
/// Handles a received `Echo` message. /// Handles a received `Echo` message.
@ -327,11 +322,7 @@ impl<NodeUid: Debug + Clone + Ord> Broadcast<NodeUid> {
} }
// Upon receiving `N - f` `Echo`s with this root hash, multicast `Ready`. // Upon receiving `N - f` `Echo`s with this root hash, multicast `Ready`.
self.ready_sent = true; self.send_ready(&hash)
let ready_msg = Target::All.message(BroadcastMessage::Ready(hash.clone()));
self.messages.push_back(ready_msg);
let our_uid = &self.netinfo.our_uid().clone();
self.handle_ready(our_uid, &hash)
} }
/// Handles a received `Ready` message. /// Handles a received `Ready` message.
@ -352,11 +343,33 @@ impl<NodeUid: Debug + Clone + Ord> Broadcast<NodeUid> {
// has not yet been sent, multicast Ready(h). // has not yet been sent, multicast Ready(h).
if self.count_readys(hash) == self.netinfo.num_faulty() + 1 && !self.ready_sent { if self.count_readys(hash) == self.netinfo.num_faulty() + 1 && !self.ready_sent {
// Enqueue a broadcast of a Ready message. // Enqueue a broadcast of a Ready message.
self.send_ready(hash)?;
}
self.compute_output(hash)
}
/// Sends an `Echo` message and handles it. Does nothing if we are only an observer.
fn send_echo(&mut self, p: Proof<Vec<u8>>) -> BroadcastResult<()> {
self.echo_sent = true;
if !self.netinfo.is_peer() {
return Ok(());
}
let echo_msg = Target::All.message(BroadcastMessage::Echo(p.clone()));
self.messages.push_back(echo_msg);
let our_uid = &self.netinfo.our_uid().clone();
self.handle_echo(our_uid, p)
}
/// Sends a `Ready` message and handles it. Does nothing if we are only an observer.
fn send_ready(&mut self, hash: &[u8]) -> BroadcastResult<()> {
self.ready_sent = true; self.ready_sent = true;
if !self.netinfo.is_peer() {
return Ok(());
}
let ready_msg = Target::All.message(BroadcastMessage::Ready(hash.to_vec())); let ready_msg = Target::All.message(BroadcastMessage::Ready(hash.to_vec()));
self.messages.push_back(ready_msg); self.messages.push_back(ready_msg);
} let our_uid = &self.netinfo.our_uid().clone();
self.compute_output(&hash) self.handle_ready(our_uid, hash)
} }
/// Checks whether the condition for output are met for this hash, and if so, sets the output /// Checks whether the condition for output are met for this hash, and if so, sets the output

View File

@ -126,6 +126,9 @@ where
} }
fn get_coin(&mut self) -> Result<()> { fn get_coin(&mut self) -> Result<()> {
if !self.netinfo.is_peer() {
return self.try_output();
}
let share = self.netinfo.secret_key().sign(&self.nonce); let share = self.netinfo.secret_key().sign(&self.nonce);
self.messages.push_back(CommonCoinMessage(share.clone())); self.messages.push_back(CommonCoinMessage(share.clone()));
let id = self.netinfo.our_uid().clone(); let id = self.netinfo.our_uid().clone();
@ -140,6 +143,13 @@ where
return Ok(()); return Ok(());
} }
self.received_shares.insert(sender_id.clone(), share); self.received_shares.insert(sender_id.clone(), share);
} else {
return Err(ErrorKind::UnknownSender.into());
}
self.try_output()
}
fn try_output(&mut self) -> Result<()> {
let received_shares = &self.received_shares; let received_shares = &self.received_shares;
if self.had_input && received_shares.len() > self.netinfo.num_faulty() { if self.had_input && received_shares.len() > self.netinfo.num_faulty() {
let sig = self.combine_and_verify_sig()?; let sig = self.combine_and_verify_sig()?;
@ -149,9 +159,6 @@ where
self.terminated = true; self.terminated = true;
} }
Ok(()) Ok(())
} else {
Err(ErrorKind::UnknownSender.into())
}
} }
fn combine_and_verify_sig(&self) -> Result<Signature> { fn combine_and_verify_sig(&self) -> Result<Signature> {

View File

@ -180,6 +180,9 @@ impl<NodeUid: Clone + Debug + Ord> CommonSubset<NodeUid> {
/// Common Subset input message handler. It receives a value for broadcast /// Common Subset input message handler. It receives a value for broadcast
/// and redirects it to the corresponding broadcast instance. /// and redirects it to the corresponding broadcast instance.
pub fn send_proposed_value(&mut self, value: ProposedValue) -> CommonSubsetResult<()> { pub fn send_proposed_value(&mut self, value: ProposedValue) -> CommonSubsetResult<()> {
if !self.netinfo.is_peer() {
return Ok(());
}
let uid = self.netinfo.our_uid().clone(); let uid = self.netinfo.our_uid().clone();
// Upon receiving input v_i , input v_i to RBC_i. See Figure 2. // Upon receiving input v_i , input v_i to RBC_i. See Figure 2.
self.process_broadcast(&uid, |bc| bc.input(value)) self.process_broadcast(&uid, |bc| bc.input(value))

View File

@ -28,6 +28,7 @@ error_chain!{
errors { errors {
UnknownSender UnknownSender
ObserverCannotPropose
} }
} }
@ -151,8 +152,12 @@ where
&mut self, &mut self,
txs: I, txs: I,
) -> HoneyBadgerResult<()> { ) -> HoneyBadgerResult<()> {
if self.netinfo.is_peer() {
self.buffer.extend(txs); self.buffer.extend(txs);
Ok(()) Ok(())
} else {
Err(ErrorKind::ObserverCannotPropose.into())
}
} }
/// Empties and returns the transaction buffer. /// Empties and returns the transaction buffer.
@ -162,6 +167,9 @@ where
/// Proposes a new batch in the current epoch. /// Proposes a new batch in the current epoch.
fn propose(&mut self) -> HoneyBadgerResult<()> { fn propose(&mut self) -> HoneyBadgerResult<()> {
if !self.netinfo.is_peer() {
return Ok(());
}
let proposal = self.choose_transactions()?; let proposal = self.choose_transactions()?;
let cs = match self.common_subsets.entry(self.epoch) { let cs = match self.common_subsets.entry(self.epoch) {
Entry::Occupied(entry) => entry.into_mut(), Entry::Occupied(entry) => entry.into_mut(),
@ -420,21 +428,7 @@ where
self.verify_pending_decryption_shares(&proposer_id, &ciphertext); self.verify_pending_decryption_shares(&proposer_id, &ciphertext);
self.remove_incorrect_decryption_shares(&proposer_id, incorrect_senders); self.remove_incorrect_decryption_shares(&proposer_id, incorrect_senders);
if let Some(share) = self.netinfo.secret_key().decrypt_share(&ciphertext) { if !self.send_decryption_share(&proposer_id, &ciphertext)? {
// Send the share to remote nodes.
self.messages.0.push_back(
Target::All.message(
MessageContent::DecryptionShare {
proposer_id: proposer_id.clone(),
share: share.clone(),
}.with_epoch(self.epoch),
),
);
let our_id = self.netinfo.our_uid().clone();
let epoch = self.epoch;
// Receive the share locally.
self.handle_decryption_share_message(&our_id, epoch, proposer_id.clone(), share)?;
} else {
warn!("Share decryption failed for proposer {:?}", proposer_id); warn!("Share decryption failed for proposer {:?}", proposer_id);
// TODO: Log the decryption failure. // TODO: Log the decryption failure.
continue; continue;
@ -448,6 +442,33 @@ where
Ok(()) Ok(())
} }
/// Verifies the ciphertext and sends decryption shares. Returns whether it is valid.
fn send_decryption_share(
&mut self,
proposer_id: &NodeUid,
ciphertext: &Ciphertext,
) -> HoneyBadgerResult<bool> {
if !self.netinfo.is_peer() {
return Ok(ciphertext.verify());
}
let share = match self.netinfo.secret_key().decrypt_share(&ciphertext) {
None => return Ok(false),
Some(share) => share,
};
// Send the share to remote nodes.
let content = MessageContent::DecryptionShare {
proposer_id: proposer_id.clone(),
share: share.clone(),
};
let message = Target::All.message(content.with_epoch(self.epoch));
self.messages.0.push_back(message);
let our_id = self.netinfo.our_uid().clone();
let epoch = self.epoch;
// Receive the share locally.
self.handle_decryption_share_message(&our_id, epoch, proposer_id.clone(), share)?;
Ok(true)
}
/// Verifies the shares of the current epoch that are pending verification. Returned are the /// Verifies the shares of the current epoch that are pending verification. Returned are the
/// senders with incorrect pending shares. /// senders with incorrect pending shares.
fn verify_pending_decryption_shares( fn verify_pending_decryption_shares(

View File

@ -142,6 +142,7 @@ pub struct NetworkInfo<NodeUid> {
all_uids: BTreeSet<NodeUid>, all_uids: BTreeSet<NodeUid>,
num_nodes: usize, num_nodes: usize,
num_faulty: usize, num_faulty: usize,
is_peer: bool,
secret_key: ClearOnDrop<Box<SecretKey>>, secret_key: ClearOnDrop<Box<SecretKey>>,
public_key_set: PublicKeySet, public_key_set: PublicKeySet,
node_indices: BTreeMap<NodeUid, usize>, node_indices: BTreeMap<NodeUid, usize>,
@ -154,10 +155,8 @@ impl<NodeUid: Clone + Ord> NetworkInfo<NodeUid> {
secret_key: ClearOnDrop<Box<SecretKey>>, secret_key: ClearOnDrop<Box<SecretKey>>,
public_key_set: PublicKeySet, public_key_set: PublicKeySet,
) -> Self { ) -> Self {
if !all_uids.contains(&our_uid) {
panic!("Missing own ID");
}
let num_nodes = all_uids.len(); let num_nodes = all_uids.len();
let is_peer = all_uids.contains(&our_uid);
let node_indices = all_uids let node_indices = all_uids
.iter() .iter()
.cloned() .cloned()
@ -169,6 +168,7 @@ impl<NodeUid: Clone + Ord> NetworkInfo<NodeUid> {
all_uids, all_uids,
num_nodes, num_nodes,
num_faulty: (num_nodes - 1) / 3, num_faulty: (num_nodes - 1) / 3,
is_peer,
secret_key, secret_key,
public_key_set, public_key_set,
node_indices, node_indices,
@ -218,4 +218,10 @@ impl<NodeUid: Clone + Ord> NetworkInfo<NodeUid> {
pub fn invocation_id(&self) -> Vec<u8> { pub fn invocation_id(&self) -> Vec<u8> {
self.public_key_set.public_key().to_bytes() self.public_key_set.public_key().to_bytes()
} }
/// Returns `true` if this node takes part in the consensus itself. If not, it is only an
/// observer.
pub fn is_peer(&self) -> bool {
self.is_peer
}
} }

View File

@ -57,6 +57,7 @@ fn test_agreement<A: Adversary<Agreement<NodeUid>>>(
expected = Some(node.outputs()[0]); expected = Some(node.outputs()[0]);
} }
} }
assert!(expected.iter().eq(network.observer.outputs()));
} }
fn test_agreement_different_sizes<A, F>(new_adversary: F) fn test_agreement_different_sizes<A, F>(new_adversary: F)

View File

@ -107,6 +107,7 @@ fn test_broadcast<A: Adversary<Broadcast<NodeUid>>>(
for node in network.nodes.values() { for node in network.nodes.values() {
assert!(once(&proposed_value.to_vec()).eq(node.outputs())); assert!(once(&proposed_value.to_vec()).eq(node.outputs()));
} }
assert!(once(&proposed_value.to_vec()).eq(network.observer.outputs()));
} }
fn new_broadcast(netinfo: Rc<NetworkInfo<NodeUid>>) -> Broadcast<NodeUid> { fn new_broadcast(netinfo: Rc<NetworkInfo<NodeUid>>) -> Broadcast<NodeUid> {

View File

@ -21,23 +21,18 @@ use network::{Adversary, MessageScheduler, NodeUid, SilentAdversary, TestNetwork
/// Tests a network of Common Coin instances with an optional expected value. Outputs the computed /// Tests a network of Common Coin instances with an optional expected value. Outputs the computed
/// common coin value if the test is successful. /// common coin value if the test is successful.
fn test_common_coin<A>( fn test_common_coin<A>(mut network: TestNetwork<A, CommonCoin<NodeUid, String>>) -> bool
mut network: TestNetwork<A, CommonCoin<NodeUid, String>>,
expected_coin: Option<bool>,
) -> bool
where where
A: Adversary<CommonCoin<NodeUid, String>>, A: Adversary<CommonCoin<NodeUid, String>>,
{ {
let ids: Vec<NodeUid> = network.nodes.keys().cloned().collect(); network.input_all(());
for id in ids { network.observer.input(()); // Observer will only return after `input` was called.
network.input(id, ());
}
// Handle messages until all good nodes have terminated. // Handle messages until all good nodes have terminated.
while !network.nodes.values().all(TestNode::terminated) { while !network.nodes.values().all(TestNode::terminated) {
network.step(); network.step();
} }
let mut expected = expected_coin; let mut expected = None;
// Verify that all instances output the same value. // Verify that all instances output the same value.
for node in network.nodes.values() { for node in network.nodes.values() {
if let Some(b) = expected { if let Some(b) = expected {
@ -48,6 +43,7 @@ where
} }
} }
// Now `expected` is the unique output of all good nodes. // Now `expected` is the unique output of all good nodes.
assert!(expected.iter().eq(network.observer.outputs()));
expected.unwrap() expected.unwrap()
} }
@ -109,7 +105,7 @@ where
let new_common_coin = |netinfo: _| CommonCoin::new(netinfo, nonce.clone()); let new_common_coin = |netinfo: _| CommonCoin::new(netinfo, nonce.clone());
let network = let network =
TestNetwork::new(num_good_nodes, num_faulty_nodes, adversary, new_common_coin); TestNetwork::new(num_good_nodes, num_faulty_nodes, adversary, new_common_coin);
let coin = test_common_coin(network, None); let coin = test_common_coin(network);
if coin { if coin {
count_true += 1; count_true += 1;
} else { } else {

View File

@ -49,6 +49,7 @@ fn test_common_subset<A: Adversary<CommonSubset<NodeUid>>>(
expected = Some(node.outputs()[0].clone()); expected = Some(node.outputs()[0].clone());
} }
let output = expected.unwrap(); let output = expected.unwrap();
assert!(once(&output).eq(network.observer.outputs()));
// The Common Subset algorithm guarantees that more than two thirds of the proposed elements // The Common Subset algorithm guarantees that more than two thirds of the proposed elements
// are in the set. // are in the set.
assert!(output.len() * 3 > inputs.len() * 2); assert!(output.len() * 3 > inputs.len() * 2);

View File

@ -36,6 +36,12 @@ impl<D: DistAlgorithm> TestNode<D> {
self.algo.terminated() self.algo.terminated()
} }
/// Inputs a value into the instance.
pub fn input(&mut self, input: D::Input) {
self.algo.input(input).expect("input");
self.outputs.extend(self.algo.output_iter());
}
/// Creates a new test node with the given broadcast instance. /// Creates a new test node with the given broadcast instance.
fn new(mut algo: D) -> TestNode<D> { fn new(mut algo: D) -> TestNode<D> {
let outputs = algo.output_iter().collect(); let outputs = algo.output_iter().collect();
@ -56,12 +62,6 @@ impl<D: DistAlgorithm> TestNode<D> {
.expect("handling message"); .expect("handling message");
self.outputs.extend(self.algo.output_iter()); self.outputs.extend(self.algo.output_iter());
} }
/// Inputs a value into the instance.
fn input(&mut self, input: D::Input) {
self.algo.input(input).expect("input");
self.outputs.extend(self.algo.output_iter());
}
} }
/// A strategy for picking the next good node to handle a message. /// A strategy for picking the next good node to handle a message.
@ -148,6 +148,7 @@ where
<D as DistAlgorithm>::NodeUid: Hash, <D as DistAlgorithm>::NodeUid: Hash,
{ {
pub nodes: BTreeMap<D::NodeUid, TestNode<D>>, pub nodes: BTreeMap<D::NodeUid, TestNode<D>>,
pub observer: TestNode<D>,
pub adv_nodes: BTreeMap<D::NodeUid, Rc<NetworkInfo<D::NodeUid>>>, pub adv_nodes: BTreeMap<D::NodeUid, Rc<NetworkInfo<D::NodeUid>>>,
adversary: A, adversary: A,
} }
@ -202,6 +203,7 @@ where
.collect(); .collect();
let mut network = TestNetwork { let mut network = TestNetwork {
nodes: (0..good_num).map(NodeUid).map(new_node_by_id).collect(), nodes: (0..good_num).map(NodeUid).map(new_node_by_id).collect(),
observer: new_node_by_id(NodeUid(good_num + adv_num)).1,
adversary: adversary(adv_nodes.clone()), adversary: adversary(adv_nodes.clone()),
adv_nodes, adv_nodes,
}; };
@ -232,6 +234,9 @@ where
node.queue.push_back((sender_id, msg.message.clone())) node.queue.push_back((sender_id, msg.message.clone()))
} }
} }
self.observer
.queue
.push_back((sender_id, msg.message.clone()));
self.adversary.push_message(sender_id, msg); self.adversary.push_message(sender_id, msg);
} }
Target::Node(to_id) => { Target::Node(to_id) => {
@ -247,6 +252,9 @@ where
} }
} }
} }
while !self.observer.queue.is_empty() {
self.observer.handle_message();
}
} }
/// Handles a queued message in a randomly selected node and returns the selected node's ID. /// Handles a queued message in a randomly selected node and returns the selected node's ID.