Moved queue into progress state.

This commit is contained in:
Marc Brinkmann 2018-10-29 16:46:37 +01:00
parent b58c977627
commit 71a7331958
1 changed files with 96 additions and 62 deletions

View File

@ -117,6 +117,9 @@ struct DropAndReAddProgress<V, N> {
awaiting_removal: collections::BTreeSet<N>,
awaiting_addition: collections::BTreeSet<N>,
expected_outputs: collections::BTreeMap<N, collections::BTreeSet<V>>,
queues: collections::BTreeMap<N, Vec<usize>>,
batch_size: usize,
contribution_size: usize,
}
// FIXME: Do not pin to `usize`.
@ -125,19 +128,31 @@ where
N: NodeIdT + Serialize + DeserializeOwned + Rand,
{
fn from_net(net: &VirtualNet<DynamicHoneyBadger<Vec<usize>, N>>) -> Self {
let total_txs = 200;
let expected_outputs: collections::BTreeMap<_, collections::BTreeSet<_>> = net
.correct_nodes()
.map(|node| (node.id().clone(), (0..total_txs).collect()))
.collect();
let queues: collections::BTreeMap<_, Vec<_>> = expected_outputs
.iter()
.map(|(id, txs)| (id.clone(), txs.iter().cloned().collect()))
.collect();
DropAndReAddProgress {
awaiting_removal: net.correct_nodes().map(|n| n.id().clone()).collect(),
awaiting_addition: net.correct_nodes().map(|n| n.id().clone()).collect(),
expected_outputs: net
.correct_nodes()
// FIXME: This should not be 0..10?!
.map(|n| (n.id().clone(), (0..10usize).into_iter().collect()))
.collect(),
expected_outputs,
queues,
batch_size: 10,
contribution_size: 10,
}
}
fn process_step(
fn process_step<R: Rng>(
&mut self,
rng: &mut R,
node_id: N,
step: &Step<DynamicHoneyBadger<Vec<usize>, N>>,
net: &mut VirtualNet<DynamicHoneyBadger<Vec<usize>, N>>,
@ -162,7 +177,7 @@ where
node_id.clone(),
Input::Change(Change::Add(pivot_node_id.clone(), pk)),
).expect("failed to send `Add` input");
self.process_step(node_id.clone(), &step, net);
self.process_step(rng, node_id.clone(), &step, net);
}
ChangeState::Complete(Change::Add(pivot_node_id, _)) => {
@ -184,17 +199,67 @@ where
}
}
}
let step = {
// Find the node's input queue.
let queue: &mut Vec<_> = self
.queues
.get_mut(&node_id)
.expect("queue for node disappeared");
// Examine potential algorithm output.
// FIXME: Use owned step.
for batch in &step.output {
info!(
"Received epoch {} batch on node {:?}.",
batch.epoch(),
node_id,
);
for tx in batch.iter() {
// Remove the confirmed contribution from the input queue.
let index = queue.iter().position(|v| v == tx);
if let Some(idx) = index {
assert_eq!(queue.remove(idx), *tx);
}
// Add it to the set of received outputs.
if !net[node_id.clone()].is_faulty() {
self.expected_outputs
.get_mut(&node_id)
.expect("output set disappeared")
.remove(tx);
}
}
}
// If not done, check if we still want to propose something.
if !step.output.is_empty() {
// Out of the remaining transactions, select a suitable amount.
let proposal =
choose_contribution(rng, queue, self.batch_size, self.contribution_size);
Some(
net.send_input(node_id.clone(), Input::User(proposal))
.expect("could not send follow-up transaction"),
)
} else {
None
}
};
step.map(|step| self.process_step(rng, node_id.clone(), &step, net));
}
fn process_steps(
&mut self,
steps: Steps<DynamicHoneyBadger<Vec<usize>, N>>,
net: &mut VirtualNet<DynamicHoneyBadger<Vec<usize>, N>>,
) {
for (node_id, step) in steps.0 {
self.process_step(node_id, &step, net)
}
}
// fn process_steps(
// &mut self,
// steps: Steps<DynamicHoneyBadger<Vec<usize>, N>>,
// net: &mut VirtualNet<DynamicHoneyBadger<Vec<usize>, N>>,
// ) {
// for (node_id, step) in steps.0 {
// self.process_step(node_id, &step, net)
// }
// }
// Checks if the test has finished successfully.
//
@ -277,7 +342,7 @@ fn do_drop_and_readd(cfg: TestConfig) {
.send_input(id, Input::User(proposal))
.expect("could not send initial transaction");
progress.process_step(id, &step, &mut net);
progress.process_step(&mut rng, id, &step, &mut net);
}
// Afterwards, remove a specific node from the dynamic honey badger network.
@ -285,59 +350,28 @@ fn do_drop_and_readd(cfg: TestConfig) {
.broadcast_input(&Input::Change(Change::Remove(pivot_node_id)))
.expect("broadcasting failed");
for (node_id, step) in steps.0 {
progress.process_step(node_id, &step, &mut net);
progress.process_step(&mut rng, node_id, &step, &mut net);
}
while !progress.finished() {
// First, crank the network, recording the output.
let (node_id, step) = net.crank_expect();
progress.process_step(node_id, &step, &mut net);
progress.process_step(&mut rng, node_id, &step, &mut net);
// Record whether or not we received some output.
let has_output = !step.output.is_empty();
// // Record whether or not we received some output.
// let has_output = !step.output.is_empty();
// Find the node's input queue.
let queue: &mut Vec<_> = queues
.get_mut(&node_id)
.expect("queue for node disappeared");
// // If not done, check if we still want to propose something.
// if has_output {
// // Out of the remaining transactions, select a suitable amount.
// let proposal =
// choose_contribution(&mut rng, queue, cfg.batch_size, cfg.contribution_size);
// Examine potential algorithm output.
for batch in step.output {
info!(
"Received epoch {} batch on node {:?}.",
batch.epoch(),
node_id,
);
for tx in batch.iter() {
// Remove the confirmed contribution from the input queue.
let index = queue.iter().position(|v| v == tx);
if let Some(idx) = index {
assert_eq!(queue.remove(idx), *tx);
}
// Add it to the set of received outputs.
if !net[node_id].is_faulty() {
progress
.expected_outputs
.get_mut(&node_id)
.expect("output set disappeared")
.remove(tx);
}
}
}
// If not done, check if we still want to propose something.
if has_output {
// Out of the remaining transactions, select a suitable amount.
let proposal =
choose_contribution(&mut rng, queue, cfg.batch_size, cfg.contribution_size);
let step = net
.send_input(node_id, Input::User(proposal))
.expect("could not send follow-up transaction");
progress.process_step(node_id, &step, &mut net);
}
// let step = net
// .send_input(node_id, Input::User(proposal))
// .expect("could not send follow-up transaction");
// progress.process_step(node_id, &step, &mut net);
// }
}
// As a final step, we verify that all nodes have arrived at the same conclusion.