Fix fork race condition in optimistic violation tower tests (#19192)

* Fix fork race condition in optimistic violation tower tests

* clippy

* pr comments
This commit is contained in:
Ashwin Sekar 2021-08-26 21:41:05 -07:00 committed by GitHub
parent 2d7f036afd
commit 73c1ea7de6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 111 additions and 64 deletions

View File

@ -1130,6 +1130,16 @@ fn test_fork_choice_refresh_old_votes() {
let lighter_fork_ledger_path = cluster.ledger_path(&context.lighter_fork_validator_key);
let heaviest_ledger_path = cluster.ledger_path(&context.heaviest_validator_key);
// Get latest votes. We make sure to wait until the vote has landed in
// blockstore. This is important because if we were the leader for the block there
// is a possibility of voting before broadcast has inserted in blockstore.
let lighter_fork_latest_vote = wait_for_last_vote_in_tower_to_land_in_ledger(
&lighter_fork_ledger_path,
&context.lighter_fork_validator_key,
);
let heaviest_fork_latest_vote =
wait_for_last_vote_in_tower_to_land_in_ledger(&heaviest_ledger_path, &context.heaviest_validator_key);
// Open ledgers
let smallest_blockstore = open_blockstore(&smallest_ledger_path);
let lighter_fork_blockstore = open_blockstore(&lighter_fork_ledger_path);
@ -1137,22 +1147,6 @@ fn test_fork_choice_refresh_old_votes() {
info!("Opened blockstores");
// Get latest votes. We additionally check to make sure the vote has landed in
// blockstore. This is important because if we were the leader for the block there
// is a possibility of voting before broadcast has inserted in blockstore.
let (lighter_fork_latest_vote, _) = last_vote_in_tower(
&lighter_fork_ledger_path,
&context.lighter_fork_validator_key,
)
.unwrap();
let (heaviest_fork_latest_vote, _) =
last_vote_in_tower(&heaviest_ledger_path, &context.heaviest_validator_key).unwrap();
while !lighter_fork_blockstore.is_full(lighter_fork_latest_vote)
|| !heaviest_blockstore.is_full(heaviest_fork_latest_vote)
{
sleep(Duration::from_millis(100));
}
// Find the first slot on the smaller fork
let lighter_ancestors: BTreeSet<Slot> = std::iter::once(lighter_fork_latest_vote)
.chain(AncestorIterator::new(
@ -1182,27 +1176,11 @@ fn test_fork_choice_refresh_old_votes() {
// Copy all the blocks from the smaller partition up to `first_slot_in_lighter_partition`
// into the smallest validator's blockstore
for lighter_slot in std::iter::once(first_slot_in_lighter_partition).chain(
AncestorIterator::new(first_slot_in_lighter_partition, &lighter_fork_blockstore),
) {
let lighter_slot_meta =
lighter_fork_blockstore.meta(lighter_slot).unwrap().unwrap();
assert!(lighter_slot_meta.is_full());
// Get the shreds from the leader of the smaller fork
let lighter_fork_data_shreds = lighter_fork_blockstore
.get_data_shreds_for_slot(lighter_slot, 0)
.unwrap();
// Insert those shreds into the smallest validator's blockstore
smallest_blockstore
.insert_shreds(lighter_fork_data_shreds, None, false)
.unwrap();
// Check insert succeeded
let new_meta = smallest_blockstore.meta(lighter_slot).unwrap().unwrap();
assert!(new_meta.is_full());
assert_eq!(new_meta.last_index, lighter_slot_meta.last_index);
}
copy_blocks(
first_slot_in_lighter_partition,
&lighter_fork_blockstore,
&smallest_blockstore,
);
// Restart the smallest validator that we killed earlier in `on_partition_start()`
drop(smallest_blockstore);
@ -2630,6 +2608,20 @@ fn purge_slots(blockstore: &Blockstore, start_slot: Slot, slot_count: Slot) {
blockstore.purge_slots(start_slot, start_slot + slot_count, PurgeType::Exact);
}
fn copy_blocks(end_slot: Slot, source: &Blockstore, dest: &Blockstore) {
for slot in std::iter::once(end_slot).chain(AncestorIterator::new(end_slot, source)) {
let source_meta = source.meta(slot).unwrap().unwrap();
assert!(source_meta.is_full());
let shreds = source.get_data_shreds_for_slot(slot, 0).unwrap();
dest.insert_shreds(shreds, None, false).unwrap();
let dest_meta = dest.meta(slot).unwrap().unwrap();
assert!(dest_meta.is_full());
assert_eq!(dest_meta.last_index, source_meta.last_index);
}
}
fn restore_tower(tower_path: &Path, node_pubkey: &Pubkey) -> Option<Tower> {
let file_tower_storage = FileTowerStorage::new(tower_path.to_path_buf());
@ -2649,6 +2641,21 @@ fn last_vote_in_tower(tower_path: &Path, node_pubkey: &Pubkey) -> Option<(Slot,
restore_tower(tower_path, node_pubkey).map(|tower| tower.last_voted_slot_hash().unwrap())
}
// Fetches the last vote in the tower, blocking until it has also appeared in blockstore.
// Fails if tower is empty
fn wait_for_last_vote_in_tower_to_land_in_ledger(ledger_path: &Path, node_pubkey: &Pubkey) -> Slot {
let (last_vote, _) = last_vote_in_tower(ledger_path, node_pubkey).unwrap();
loop {
// We reopen in a loop to make sure we get updates
let blockstore = open_blockstore(ledger_path);
if blockstore.is_full(last_vote) {
break;
}
sleep(Duration::from_millis(100));
}
last_vote
}
fn root_in_tower(tower_path: &Path, node_pubkey: &Pubkey) -> Option<Slot> {
restore_tower(tower_path, node_pubkey).map(|tower| tower.root())
}
@ -2660,12 +2667,19 @@ fn remove_tower(tower_path: &Path, node_pubkey: &Pubkey) {
// A bit convoluted test case; but this roughly follows this test theoretical scenario:
//
// Step 1: You have validator A + B with 31% and 36% of the stake:
// Step 1: You have validator A + B with 31% and 36% of the stake. Run only validator B:
//
// S0 -> S1 -> S2 -> S3 (A + B vote, optimistically confirmed)
// S0 -> S1 -> S2 -> S3 (B vote)
//
// Step 2: Turn off A + B, and truncate the ledger after slot `S3` (simulate votes not
// Step 2: Turn off B, and truncate the ledger after slot `S3` (simulate votes not
// landing in next slot).
// Copy ledger fully to validator A and validator C
//
// Step 3: Turn on A, and have it vote up to S3. Truncate anything past slot `S3`.
//
// S0 -> S1 -> S2 -> S3 (A & B vote, optimistically confirmed)
//
// Step 4:
// Start validator C with 33% of the stake with same ledger, but only up to slot S2.
// Have `C` generate some blocks like:
//
@ -2684,7 +2698,7 @@ fn remove_tower(tower_path: &Path, node_pubkey: &Pubkey) {
// |
// -> S4 (C) -> S5
//
// Step 4:
// Step 5:
// Without the persisted tower:
// `A` would choose to vote on the fork with `S4 -> S5`. This is true even if `A`
// generates a new fork starting at slot `S3` because `C` has more stake than `A`
@ -2749,14 +2763,14 @@ fn do_test_optimistic_confirmation_violation_with_or_without_tower(with_tower: b
let val_b_ledger_path = cluster.ledger_path(&validator_b_pubkey);
let val_c_ledger_path = cluster.ledger_path(&validator_c_pubkey);
// Immediately kill validator C
// Immediately kill validator A, and C
let validator_a_info = cluster.exit_node(&validator_a_pubkey);
let validator_c_info = cluster.exit_node(&validator_c_pubkey);
// Step 1:
// Let validator A, B, (D) run for a while.
let (mut validator_a_finished, mut validator_b_finished) = (false, false);
// Let validator B, (D) run for a while.
let now = Instant::now();
while !(validator_a_finished && validator_b_finished) {
loop {
let elapsed = now.elapsed();
if elapsed > Duration::from_secs(30) {
panic!(
@ -2766,37 +2780,71 @@ fn do_test_optimistic_confirmation_violation_with_or_without_tower(with_tower: b
}
sleep(Duration::from_millis(100));
if let Some((last_vote, _)) = last_vote_in_tower(&val_a_ledger_path, &validator_a_pubkey) {
if !validator_a_finished && last_vote >= next_slot_on_a {
validator_a_finished = true;
}
}
if let Some((last_vote, _)) = last_vote_in_tower(&val_b_ledger_path, &validator_b_pubkey) {
if !validator_b_finished && last_vote >= next_slot_on_a {
validator_b_finished = true;
if last_vote >= next_slot_on_a {
break;
}
}
}
// kill them at once after the above loop; otherwise one might stall the other!
let validator_a_info = cluster.exit_node(&validator_a_pubkey);
// kill B
let _validator_b_info = cluster.exit_node(&validator_b_pubkey);
// Step 2:
// Stop validator and truncate ledger
// Stop validator and truncate ledger, copy over B's ledger to A
info!("truncate validator C's ledger");
{
// first copy from validator A's ledger
// first copy from validator B's ledger
std::fs::remove_dir_all(&validator_c_info.info.ledger_path).unwrap();
let mut opt = fs_extra::dir::CopyOptions::new();
opt.copy_inside = true;
fs_extra::dir::copy(&val_a_ledger_path, &val_c_ledger_path, &opt).unwrap();
// Remove A's tower in the C's new copied ledger
remove_tower(&validator_c_info.info.ledger_path, &validator_a_pubkey);
fs_extra::dir::copy(&val_b_ledger_path, &val_c_ledger_path, &opt).unwrap();
// Remove B's tower in the C's new copied ledger
remove_tower(&val_c_ledger_path, &validator_b_pubkey);
let blockstore = open_blockstore(&validator_c_info.info.ledger_path);
let blockstore = open_blockstore(&val_c_ledger_path);
purge_slots(&blockstore, base_slot + 1, truncated_slots);
}
info!("truncate validator A's ledger");
info!("Create validator A's ledger");
{
// Find latest vote in B, and wait for it to reach blockstore
let b_last_vote = wait_for_last_vote_in_tower_to_land_in_ledger(&val_b_ledger_path, &validator_b_pubkey);
// Now we copy these blocks to A
let b_blockstore = open_blockstore(&val_b_ledger_path);
let a_blockstore = open_blockstore(&val_a_ledger_path);
copy_blocks(b_last_vote, &b_blockstore, &a_blockstore);
// Purge uneccessary slots
purge_slots(&a_blockstore, next_slot_on_a + 1, truncated_slots);
}
// Step 3:
// Restart A so that it can vote for the slots in B's fork
info!("Restarting A");
cluster.restart_node(
&validator_a_pubkey,
validator_a_info,
SocketAddrSpace::Unspecified,
);
info!("Waiting for A to vote");
loop {
if let Some((last_vote_slot, _)) =
last_vote_in_tower(&val_a_ledger_path, &validator_a_pubkey)
{
if last_vote_slot >= next_slot_on_a {
info!("Validator A has caught up: {}", last_vote_slot);
break;
} else {
info!("Validator A latest vote: {}", last_vote_slot);
}
}
sleep(Duration::from_millis(20));
}
info!("Killing A");
let validator_a_info = cluster.exit_node(&validator_a_pubkey);
{
let blockstore = open_blockstore(&val_a_ledger_path);
purge_slots(&blockstore, next_slot_on_a + 1, truncated_slots);
@ -2816,10 +2864,9 @@ fn do_test_optimistic_confirmation_violation_with_or_without_tower(with_tower: b
}
}
// Step 3:
// Step 4:
// Run validator C only to make it produce and vote on its own fork.
info!("Restart validator C again!!!");
let val_c_ledger_path = validator_c_info.info.ledger_path.clone();
cluster.restart_node(
&validator_c_pubkey,
validator_c_info,
@ -2843,7 +2890,7 @@ fn do_test_optimistic_confirmation_violation_with_or_without_tower(with_tower: b
assert!(!votes_on_c_fork.is_empty());
info!("collected validator C's votes: {:?}", votes_on_c_fork);
// Step 4:
// Step 5:
// verify whether there was violation or not
info!("Restart validator A again!!!");
cluster.restart_node(