Only dump duplicate descendants in dump & repair (#31559)

This commit is contained in:
Ashwin Sekar 2023-06-21 11:28:42 -07:00 committed by GitHub
parent 42ccc5cf40
commit 8135cf35bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 230 additions and 17 deletions

View File

@ -1498,7 +1498,7 @@ impl ReplayStage {
.iter()
.chain(std::iter::once(&duplicate_slot))
.map(|slot| {
// Clear the duplicate banks from BankForks
// Clear the banks from BankForks
let bank = w_bank_forks
.remove(*slot)
.expect("BankForks should not have been purged yet");
@ -1518,20 +1518,38 @@ impl ReplayStage {
drop(removed_banks);
for (slot, slot_id) in slots_to_purge {
warn!(
"purging descendant: {} with slot_id {}, of slot {}",
slot, slot_id, duplicate_slot
);
// Clear the slot signatures from status cache for this slot.
// TODO: What about RPC queries that had already cloned the Bank for this slot
// and are looking up the signature for this slot?
root_bank.clear_slot_signatures(slot);
// Clear the slot-related data in blockstore. This will:
// 1) Clear old shreds allowing new ones to be inserted
// 2) Clear the "dead" flag allowing ReplayStage to start replaying
// this slot
blockstore.clear_unconfirmed_slot(slot);
if let Some(bank_hash) = blockstore.get_bank_hash(slot) {
// If a descendant was successfully replayed and chained from a duplicate it must
// also be a duplicate. In this case we *need* to repair it, so we clear from
// blockstore.
warn!(
"purging duplicate descendant: {} with slot_id {} and bank hash {}, of slot {}",
slot, slot_id, bank_hash, duplicate_slot
);
// Clear the slot-related data in blockstore. This will:
// 1) Clear old shreds allowing new ones to be inserted
// 2) Clear the "dead" flag allowing ReplayStage to start replaying
// this slot
blockstore.clear_unconfirmed_slot(slot);
} else if slot == duplicate_slot {
warn!("purging duplicate slot: {} with slot_id {}", slot, slot_id);
blockstore.clear_unconfirmed_slot(slot);
} else {
// If a descendant was unable to replay and chained from a duplicate, it is not
// necessary to repair it. It is most likely that this block is fine, and will
// replay on successful repair of the parent. If this block is also a duplicate, it
// will be handled in the next round of repair/replay - so we just clear the dead
// flag for now.
warn!("not purging descendant {} of slot {} as it is dead. resetting dead flag instead", slot, duplicate_slot);
// Clear the "dead" flag allowing ReplayStage to start replaying
// this slot once the parent is repaired
blockstore.remove_dead_slot(slot).unwrap();
}
// Clear the progress map of these forks
let _ = progress.remove(&slot);
@ -5730,12 +5748,16 @@ pub(crate) mod tests {
assert!(bank7.get_signature_status(&vote_tx.signatures[0]).is_some());
assert!(bank7.get_signature_status(&transfer_sig).is_some());
// Mark slot 7 dead
// Give all slots a bank hash but mark slot 7 dead
for i in 0..=6 {
blockstore.insert_bank_hash(i, Hash::new_unique(), false);
}
blockstore
.set_dead_slot(7)
.expect("Failed to mark slot as dead in blockstore");
// Purging slot 5 should purge only slots 5 and its descendants 6,7
// Purging slot 5 should purge only slots 5 and its descendant 6. Since 7 is already dead,
// it gets reset but not removed
ReplayStage::purge_unconfirmed_duplicate_slot(
5,
&mut ancestors,
@ -5755,13 +5777,16 @@ pub(crate) mod tests {
}
// Blockstore should have been cleared
for slot in &[5, 6, 7] {
for slot in &[5, 6] {
assert!(!blockstore.is_full(*slot));
// Slot 7 was marked dead before, should no longer be marked
assert!(!blockstore.is_dead(*slot));
assert!(blockstore.get_slot_entries(*slot, 0).unwrap().is_empty());
}
// Slot 7 was marked dead before, should no longer be marked
assert!(!blockstore.is_dead(7));
assert!(!blockstore.get_slot_entries(7, 0).unwrap().is_empty());
// Should not be able to find signature in slot 5 for previously
// processed transactions
assert!(bank7.get_signature_status(&vote_tx.signatures[0]).is_none());
@ -5782,7 +5807,7 @@ pub(crate) mod tests {
&bank_forks,
&blockstore,
);
for i in 4..=7 {
for i in 4..=6 {
assert!(bank_forks.read().unwrap().get(i).is_none());
assert!(progress.get(&i).is_none());
assert!(blockstore.get_slot_entries(i, 0).unwrap().is_empty());
@ -5793,7 +5818,7 @@ pub(crate) mod tests {
assert!(!blockstore.get_slot_entries(i, 0).unwrap().is_empty());
}
// Purging slot 1 should purge both forks 2 and 3
// Purging slot 1 should purge both forks 2 and 3 but leave 7 untouched as it is dead
let mut descendants = bank_forks.read().unwrap().descendants();
let mut ancestors = bank_forks.read().unwrap().ancestors();
ReplayStage::purge_unconfirmed_duplicate_slot(
@ -5805,13 +5830,201 @@ pub(crate) mod tests {
&bank_forks,
&blockstore,
);
for i in 1..=7 {
for i in 1..=6 {
assert!(bank_forks.read().unwrap().get(i).is_none());
assert!(progress.get(&i).is_none());
assert!(blockstore.get_slot_entries(i, 0).unwrap().is_empty());
}
assert!(bank_forks.read().unwrap().get(0).is_some());
assert!(progress.get(&0).is_some());
// Slot 7 untouched
assert!(!blockstore.is_dead(7));
assert!(!blockstore.get_slot_entries(7, 0).unwrap().is_empty());
}
#[test]
fn test_purge_unconfirmed_duplicate_slots_and_reattach() {
let ReplayBlockstoreComponents {
blockstore,
validator_node_to_vote_keys,
vote_simulator,
leader_schedule_cache,
rpc_subscriptions,
..
} = replay_blockstore_components(
Some(tr(0) / (tr(1) / (tr(2) / (tr(4))) / (tr(3) / (tr(5) / (tr(6)))))),
1,
None::<GenerateVotes>,
);
let VoteSimulator {
bank_forks,
mut progress,
..
} = vote_simulator;
let mut replay_timing = ReplayTiming::default();
// Create bank 7 and insert to blockstore and bank forks
let root_bank = bank_forks.read().unwrap().root_bank();
let bank7 = Bank::new_from_parent(
&bank_forks.read().unwrap().get(6).unwrap(),
&Pubkey::default(),
7,
);
bank_forks.write().unwrap().insert(bank7);
blockstore.add_tree(tr(6) / tr(7), false, false, 3, Hash::default());
let mut descendants = bank_forks.read().unwrap().descendants();
let mut ancestors = bank_forks.read().unwrap().ancestors();
// Mark earlier slots as frozen, but we have the wrong version of slots 3 and 5, so slot 6 is dead and
// slot 7 is unreplayed
for i in 0..=5 {
blockstore.insert_bank_hash(i, Hash::new_unique(), false);
}
blockstore
.set_dead_slot(6)
.expect("Failed to mark slot 6 as dead in blockstore");
// Purge slot 3 as it is duplicate, this should also purge slot 5 but not touch 6 and 7
ReplayStage::purge_unconfirmed_duplicate_slot(
3,
&mut ancestors,
&mut descendants,
&mut progress,
&root_bank,
&bank_forks,
&blockstore,
);
for slot in &[3, 5, 6, 7] {
assert!(bank_forks.read().unwrap().get(*slot).is_none());
assert!(progress.get(slot).is_none());
}
for slot in &[3, 5] {
assert!(!blockstore.is_full(*slot));
assert!(!blockstore.is_dead(*slot));
assert!(blockstore.get_slot_entries(*slot, 0).unwrap().is_empty());
}
for slot in 6..=7 {
assert!(!blockstore.is_dead(slot));
assert!(!blockstore.get_slot_entries(slot, 0).unwrap().is_empty())
}
// Simulate repair fixing slot 3 and 5
let (shreds, _) = make_slot_entries(
3, // slot
1, // parent_slot
8, // num_entries
true, // merkle_variant
);
blockstore.insert_shreds(shreds, None, false).unwrap();
let (shreds, _) = make_slot_entries(
5, // slot
3, // parent_slot
8, // num_entries
true, // merkle_variant
);
blockstore.insert_shreds(shreds, None, false).unwrap();
// 3 should now be an active bank
ReplayStage::generate_new_bank_forks(
&blockstore,
&bank_forks,
&leader_schedule_cache,
&rpc_subscriptions,
&mut progress,
&mut replay_timing,
);
assert_eq!(bank_forks.read().unwrap().active_bank_slots(), vec![3]);
// Freeze 3
{
let bank3 = bank_forks.read().unwrap().get(3).unwrap();
progress.insert(
3,
ForkProgress::new_from_bank(
&bank3,
bank3.collector_id(),
validator_node_to_vote_keys
.get(bank3.collector_id())
.unwrap(),
Some(1),
0,
0,
),
);
bank3.freeze();
}
// 5 Should now be an active bank
ReplayStage::generate_new_bank_forks(
&blockstore,
&bank_forks,
&leader_schedule_cache,
&rpc_subscriptions,
&mut progress,
&mut replay_timing,
);
assert_eq!(bank_forks.read().unwrap().active_bank_slots(), vec![5]);
// Freeze 5
{
let bank5 = bank_forks.read().unwrap().get(5).unwrap();
progress.insert(
5,
ForkProgress::new_from_bank(
&bank5,
bank5.collector_id(),
validator_node_to_vote_keys
.get(bank5.collector_id())
.unwrap(),
Some(3),
0,
0,
),
);
bank5.freeze();
}
// 6 should now be an active bank even though we haven't repaired it because it
// wasn't dumped
ReplayStage::generate_new_bank_forks(
&blockstore,
&bank_forks,
&leader_schedule_cache,
&rpc_subscriptions,
&mut progress,
&mut replay_timing,
);
assert_eq!(bank_forks.read().unwrap().active_bank_slots(), vec![6]);
// Freeze 6 now that we have the correct version of 5.
{
let bank6 = bank_forks.read().unwrap().get(6).unwrap();
progress.insert(
6,
ForkProgress::new_from_bank(
&bank6,
bank6.collector_id(),
validator_node_to_vote_keys
.get(bank6.collector_id())
.unwrap(),
Some(5),
0,
0,
),
);
bank6.freeze();
}
// 7 should be found as an active bank
ReplayStage::generate_new_bank_forks(
&blockstore,
&bank_forks,
&leader_schedule_cache,
&rpc_subscriptions,
&mut progress,
&mut replay_timing,
);
assert_eq!(bank_forks.read().unwrap().active_bank_slots(), vec![7]);
}
#[test]