Add frozen hashes and marking DuplicateConfirmed in blockstore to state machine (#18648)

This commit is contained in:
carllin 2021-07-18 17:04:25 -07:00 committed by GitHub
parent b9dc85a934
commit ce467bea20
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 1075 additions and 590 deletions

File diff suppressed because it is too large Load Diff

View File

@ -40,5 +40,10 @@ pub trait ForkChoice {
fn mark_fork_invalid_candidate(&mut self, invalid_slot: &Self::ForkChoiceKey);
fn mark_fork_valid_candidate(&mut self, valid_slot: &Self::ForkChoiceKey);
/// Returns any newly duplicate confirmed ancestors of `valid_slot` up to and including
/// `valid_slot` itself
fn mark_fork_valid_candidate(
&mut self,
valid_slot: &Self::ForkChoiceKey,
) -> Vec<Self::ForkChoiceKey>;
}

View File

@ -1007,11 +1007,21 @@ impl ForkChoice for HeaviestSubtreeForkChoice {
}
}
fn mark_fork_valid_candidate(&mut self, valid_slot_hash_key: &SlotHashKey) {
fn mark_fork_valid_candidate(&mut self, valid_slot_hash_key: &SlotHashKey) -> Vec<SlotHashKey> {
info!(
"marking fork starting at: {:?} valid candidate",
valid_slot_hash_key
);
let mut newly_duplicate_confirmed_ancestors = vec![];
for ancestor_key in std::iter::once(*valid_slot_hash_key)
.chain(self.ancestor_iterator(*valid_slot_hash_key))
{
if !self.is_duplicate_confirmed(&ancestor_key).unwrap() {
newly_duplicate_confirmed_ancestors.push(ancestor_key);
}
}
let mut update_operations = UpdateOperations::default();
// Notify all the children of this node that a parent was marked as valid
for child_hash_key in self.subtree_diff(*valid_slot_hash_key, SlotHashKey::default()) {
@ -1025,6 +1035,7 @@ impl ForkChoice for HeaviestSubtreeForkChoice {
// Aggregate across all ancestors to find the new best slots including this fork
self.insert_aggregate_operations(&mut update_operations, *valid_slot_hash_key);
self.process_update_operations(update_operations);
newly_duplicate_confirmed_ancestors
}
}

View File

@ -55,7 +55,7 @@ use solana_sdk::{
};
use solana_vote_program::vote_state::Vote;
use std::{
collections::{BTreeMap, HashMap, HashSet},
collections::{HashMap, HashSet},
result,
sync::{
atomic::{AtomicBool, Ordering},
@ -427,6 +427,7 @@ impl ReplayStage {
let mut process_gossip_duplicate_confirmed_slots_time = Measure::start("process_gossip_duplicate_confirmed_slots");
Self::process_gossip_duplicate_confirmed_slots(
&gossip_duplicate_confirmed_slots_receiver,
&blockstore,
&mut duplicate_slots_tracker,
&mut gossip_duplicate_confirmed_slots,
&bank_forks,
@ -455,6 +456,7 @@ impl ReplayStage {
let mut process_duplicate_slots_time = Measure::start("process_duplicate_slots");
if !tpu_has_bank {
Self::process_duplicate_slots(
&blockstore,
&duplicate_slots_receiver,
&mut duplicate_slots_tracker,
&gossip_duplicate_confirmed_slots,
@ -503,7 +505,7 @@ impl ReplayStage {
&bank_forks,
);
Self::mark_slots_confirmed(&confirmed_forks, &bank_forks, &mut progress, &mut duplicate_slots_tracker, &mut heaviest_subtree_fork_choice, &mut duplicate_slots_to_repair);
Self::mark_slots_confirmed(&confirmed_forks, &blockstore, &bank_forks, &mut progress, &mut duplicate_slots_tracker, &mut heaviest_subtree_fork_choice, &mut duplicate_slots_to_repair);
}
compute_slot_stats_time.stop();
@ -1048,6 +1050,7 @@ impl ReplayStage {
// for duplicate slot recovery.
fn process_gossip_duplicate_confirmed_slots(
gossip_duplicate_confirmed_slots_receiver: &GossipDuplicateConfirmedSlotsReceiver,
blockstore: &Blockstore,
duplicate_slots_tracker: &mut DuplicateSlotsTracker,
gossip_duplicate_confirmed_slots: &mut GossipDuplicateConfirmedSlots,
bank_forks: &RwLock<BankForks>,
@ -1057,27 +1060,30 @@ impl ReplayStage {
) {
let root = bank_forks.read().unwrap().root();
for new_confirmed_slots in gossip_duplicate_confirmed_slots_receiver.try_iter() {
for (confirmed_slot, confirmed_hash) in new_confirmed_slots {
for (confirmed_slot, duplicate_confirmed_hash) in new_confirmed_slots {
if confirmed_slot <= root {
continue;
} else if let Some(prev_hash) =
gossip_duplicate_confirmed_slots.insert(confirmed_slot, confirmed_hash)
} else if let Some(prev_hash) = gossip_duplicate_confirmed_slots
.insert(confirmed_slot, duplicate_confirmed_hash)
{
assert_eq!(prev_hash, confirmed_hash);
assert_eq!(prev_hash, duplicate_confirmed_hash);
// Already processed this signal
return;
}
let duplicate_confirmed_state = DuplicateConfirmedState::new_from_state(
duplicate_confirmed_hash,
|| progress.is_dead(confirmed_slot).unwrap_or(false),
|| bank_forks.read().unwrap().bank_hash(confirmed_slot),
);
check_slot_agrees_with_cluster(
confirmed_slot,
root,
bank_forks.read().unwrap().bank_hash(confirmed_slot),
blockstore,
duplicate_slots_tracker,
gossip_duplicate_confirmed_slots,
progress,
fork_choice,
duplicate_slots_to_repair,
SlotStateUpdate::DuplicateConfirmed,
SlotStateUpdate::DuplicateConfirmed(duplicate_confirmed_state),
);
}
}
@ -1104,6 +1110,7 @@ impl ReplayStage {
// Checks for and handle forks with duplicate slots.
fn process_duplicate_slots(
blockstore: &Blockstore,
duplicate_slots_receiver: &DuplicateSlotReceiver,
duplicate_slots_tracker: &mut DuplicateSlotsTracker,
gossip_duplicate_confirmed_slots: &GossipDuplicateConfirmedSlots,
@ -1126,16 +1133,21 @@ impl ReplayStage {
new_duplicate_slots.into_iter().zip(bank_hashes.into_iter())
{
// WindowService should only send the signal once per slot
let duplicate_state = DuplicateState::new_from_state(
duplicate_slot,
gossip_duplicate_confirmed_slots,
fork_choice,
|| progress.is_dead(duplicate_slot).unwrap_or(false),
|| bank_hash,
);
check_slot_agrees_with_cluster(
duplicate_slot,
root_slot,
bank_hash,
blockstore,
duplicate_slots_tracker,
gossip_duplicate_confirmed_slots,
progress,
fork_choice,
duplicate_slots_to_repair,
SlotStateUpdate::Duplicate,
SlotStateUpdate::Duplicate(duplicate_state),
);
}
}
@ -1417,16 +1429,20 @@ impl ReplayStage {
err: format!("error: {:?}", err),
timestamp: timestamp(),
});
let dead_state = DeadState::new_from_state(
slot,
duplicate_slots_tracker,
gossip_duplicate_confirmed_slots,
heaviest_subtree_fork_choice,
);
check_slot_agrees_with_cluster(
slot,
root,
Some(bank.hash()),
blockstore,
duplicate_slots_tracker,
gossip_duplicate_confirmed_slots,
progress,
heaviest_subtree_fork_choice,
duplicate_slots_to_repair,
SlotStateUpdate::Dead,
SlotStateUpdate::Dead(dead_state),
);
}
@ -1926,16 +1942,21 @@ impl ReplayStage {
.get_fork_stats_mut(bank.slot())
.expect("All frozen banks must exist in the Progress map")
.bank_hash = Some(bank.hash());
let bank_frozen_state = BankFrozenState::new_from_state(
bank.slot(),
bank.hash(),
duplicate_slots_tracker,
gossip_duplicate_confirmed_slots,
heaviest_subtree_fork_choice,
);
check_slot_agrees_with_cluster(
bank.slot(),
bank_forks.read().unwrap().root(),
Some(bank.hash()),
blockstore,
duplicate_slots_tracker,
gossip_duplicate_confirmed_slots,
progress,
heaviest_subtree_fork_choice,
duplicate_slots_to_repair,
SlotStateUpdate::Frozen,
SlotStateUpdate::BankFrozen(bank_frozen_state),
);
if let Some(sender) = bank_notification_sender {
sender
@ -2444,41 +2465,39 @@ impl ReplayStage {
}
fn mark_slots_confirmed(
confirmed_forks: &[Slot],
confirmed_forks: &[(Slot, Hash)],
blockstore: &Blockstore,
bank_forks: &RwLock<BankForks>,
progress: &mut ProgressMap,
duplicate_slots_tracker: &mut DuplicateSlotsTracker,
fork_choice: &mut HeaviestSubtreeForkChoice,
duplicate_slots_to_repair: &mut DuplicateSlotsToRepair,
) {
let (root_slot, bank_hashes) = {
let r_bank_forks = bank_forks.read().unwrap();
let bank_hashes: Vec<Option<Hash>> = confirmed_forks
.iter()
.map(|slot| r_bank_forks.bank_hash(*slot))
.collect();
(r_bank_forks.root(), bank_hashes)
};
for (slot, bank_hash) in confirmed_forks.iter().zip(bank_hashes.into_iter()) {
let root_slot = bank_forks.read().unwrap().root();
for (slot, frozen_hash) in confirmed_forks.iter() {
// This case should be guaranteed as false by confirm_forks()
if let Some(false) = progress.is_supermajority_confirmed(*slot) {
// Because supermajority confirmation will iterate through and update the
// subtree in fork choice, only incur this cost if the slot wasn't already
// confirmed
progress.set_supermajority_confirmed_slot(*slot);
// If the slot was confirmed, then it must be frozen. Otherwise, we couldn't
// have replayed any of its descendants and figured out it was confirmed.
assert!(*frozen_hash != Hash::default());
let duplicate_confirmed_state = DuplicateConfirmedState::new_from_state(
*frozen_hash,
|| false,
|| Some(*frozen_hash),
);
check_slot_agrees_with_cluster(
*slot,
root_slot,
bank_hash,
blockstore,
duplicate_slots_tracker,
// Don't need to pass the gossip confirmed slots since `slot`
// is already marked as confirmed in progress
&BTreeMap::new(),
progress,
fork_choice,
duplicate_slots_to_repair,
SlotStateUpdate::DuplicateConfirmed,
SlotStateUpdate::DuplicateConfirmed(duplicate_confirmed_state),
);
}
}
@ -2490,7 +2509,7 @@ impl ReplayStage {
total_stake: Stake,
progress: &ProgressMap,
bank_forks: &RwLock<BankForks>,
) -> Vec<Slot> {
) -> Vec<(Slot, Hash)> {
let mut confirmed_forks = vec![];
for (slot, prog) in progress.iter() {
if !prog.fork_stats.is_supermajority_confirmed {
@ -2504,7 +2523,7 @@ impl ReplayStage {
if bank.is_frozen() && tower.is_slot_confirmed(*slot, voted_stakes, total_stake) {
info!("validator fork confirmed {} {}ms", *slot, duration);
datapoint_info!("validator-confirmation", ("duration_ms", duration, i64));
confirmed_forks.push(*slot);
confirmed_forks.push((*slot, bank.hash()));
} else {
debug!(
"validator fork not confirmed {} {}ms {:?}",
@ -2673,7 +2692,7 @@ impl ReplayStage {
}
#[cfg(test)]
mod tests {
pub mod tests {
use super::*;
use crate::{
consensus::Tower,
@ -3247,14 +3266,14 @@ mod tests {
#[test]
fn test_dead_fork_trailing_entry() {
let keypair = Keypair::new();
let res = check_dead_fork(|genesis_keypair, bank| {
let res = check_dead_fork(|funded_keypair, bank| {
let blockhash = bank.last_blockhash();
let slot = bank.slot();
let hashes_per_tick = bank.hashes_per_tick().unwrap_or(0);
let mut entries =
entry::create_ticks(bank.ticks_per_slot(), hashes_per_tick, blockhash);
let last_entry_hash = entries.last().unwrap().hash;
let tx = system_transaction::transfer(genesis_keypair, &keypair.pubkey(), 2, blockhash);
let tx = system_transaction::transfer(funded_keypair, &keypair.pubkey(), 2, blockhash);
let trailing_entry = entry::next_entry(&last_entry_hash, 1, vec![tx]);
entries.push(trailing_entry);
entries_to_test_shreds(entries, slot, slot.saturating_sub(1), true, 0)
@ -3270,14 +3289,19 @@ mod tests {
#[test]
fn test_dead_fork_entry_deserialize_failure() {
// Insert entry that causes deserialization failure
let res = check_dead_fork(|_, _| {
let res = check_dead_fork(|_, bank| {
let gibberish = [0xa5u8; PACKET_DATA_SIZE];
let mut data_header = DataShredHeader::default();
data_header.flags |= DATA_COMPLETE_SHRED;
// Need to provide the right size for Shredder::deshred.
data_header.size = SIZE_OF_DATA_SHRED_PAYLOAD as u16;
data_header.parent_offset = (bank.slot() - bank.parent_slot()) as u16;
let shred_common_header = ShredCommonHeader {
slot: bank.slot(),
..ShredCommonHeader::default()
};
let mut shred = Shred::new_empty_from_header(
ShredCommonHeader::default(),
shred_common_header,
data_header,
CodingShredHeader::default(),
);
@ -3306,37 +3330,43 @@ mod tests {
let ledger_path = get_tmp_ledger_path!();
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let res = {
let blockstore = Arc::new(
Blockstore::open(&ledger_path)
.expect("Expected to be able to open database ledger"),
);
let GenesisConfigInfo {
mut genesis_config,
mint_keypair,
let ReplayBlockstoreComponents {
blockstore,
vote_simulator,
..
} = create_genesis_config(1000);
genesis_config.poh_config.hashes_per_tick = Some(2);
let bank_forks = BankForks::new(Bank::new(&genesis_config));
let bank0 = bank_forks.working_bank();
let mut progress = ProgressMap::default();
let last_blockhash = bank0.last_blockhash();
let mut bank0_progress = progress
.entry(bank0.slot())
.or_insert_with(|| ForkProgress::new(last_blockhash, None, None, 0, 0));
let shreds = shred_to_insert(&mint_keypair, bank0.clone());
} = replay_blockstore_components(Some(tr(0)), 1, None);
let VoteSimulator {
mut progress,
bank_forks,
mut heaviest_subtree_fork_choice,
validator_keypairs,
..
} = vote_simulator;
let bank0 = bank_forks.read().unwrap().get(0).cloned().unwrap();
assert!(bank0.is_frozen());
assert_eq!(bank0.tick_height(), bank0.max_tick_height());
let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1);
bank_forks.write().unwrap().insert(bank1);
let bank1 = bank_forks.read().unwrap().get(1).cloned().unwrap();
let mut bank1_progress = progress
.entry(bank1.slot())
.or_insert_with(|| ForkProgress::new(bank1.last_blockhash(), None, None, 0, 0));
let shreds = shred_to_insert(
&validator_keypairs.values().next().unwrap().node_keypair,
bank1.clone(),
);
blockstore.insert_shreds(shreds, None, false).unwrap();
let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
let bank_forks = Arc::new(RwLock::new(bank_forks));
let exit = Arc::new(AtomicBool::new(false));
let res = ReplayStage::replay_blockstore_into_bank(
&bank0,
&bank1,
&blockstore,
&mut bank0_progress,
&mut bank1_progress,
None,
&replay_vote_sender,
&VerifyRecyclers::default(),
);
let rpc_subscriptions = Arc::new(RpcSubscriptions::new(
&exit,
bank_forks.clone(),
@ -3346,26 +3376,26 @@ mod tests {
if let Err(err) = &res {
ReplayStage::mark_dead_slot(
&blockstore,
&bank0,
&bank1,
0,
err,
&rpc_subscriptions,
&mut DuplicateSlotsTracker::default(),
&GossipDuplicateConfirmedSlots::default(),
&mut progress,
&mut HeaviestSubtreeForkChoice::new((0, Hash::default())),
&mut heaviest_subtree_fork_choice,
&mut DuplicateSlotsToRepair::default(),
);
}
// Check that the erroring bank was marked as dead in the progress map
assert!(progress
.get(&bank0.slot())
.get(&bank1.slot())
.map(|b| b.is_dead)
.unwrap_or(false));
// Check that the erroring bank was marked as dead in blockstore
assert!(blockstore.is_dead(bank0.slot()));
assert!(blockstore.is_dead(bank1.slot()));
res.map(|_| ())
};
let _ignored = remove_dir_all(&ledger_path);
@ -3659,7 +3689,7 @@ mod tests {
&bank_forks,
);
// No new stats should have been computed
assert_eq!(confirmed_forks, vec![0]);
assert_eq!(confirmed_forks, vec![(0, bank0.hash())]);
}
let ancestors = bank_forks.read().unwrap().ancestors();
@ -4785,16 +4815,21 @@ mod tests {
let mut gossip_duplicate_confirmed_slots = GossipDuplicateConfirmedSlots::default();
let bank4_hash = bank_forks.read().unwrap().bank_hash(4).unwrap();
assert_ne!(bank4_hash, Hash::default());
let duplicate_state = DuplicateState::new_from_state(
4,
&gossip_duplicate_confirmed_slots,
&mut vote_simulator.heaviest_subtree_fork_choice,
|| progress.is_dead(4).unwrap_or(false),
|| Some(bank4_hash),
);
check_slot_agrees_with_cluster(
4,
bank_forks.read().unwrap().root(),
Some(bank4_hash),
&blockstore,
&mut duplicate_slots_tracker,
&gossip_duplicate_confirmed_slots,
&progress,
&mut vote_simulator.heaviest_subtree_fork_choice,
&mut DuplicateSlotsToRepair::default(),
SlotStateUpdate::Duplicate,
SlotStateUpdate::Duplicate(duplicate_state),
);
let (vote_fork, reset_fork) = run_compute_and_select_forks(
@ -4811,16 +4846,21 @@ mod tests {
blockstore.store_duplicate_slot(2, vec![], vec![]).unwrap();
let bank2_hash = bank_forks.read().unwrap().bank_hash(2).unwrap();
assert_ne!(bank2_hash, Hash::default());
let duplicate_state = DuplicateState::new_from_state(
2,
&gossip_duplicate_confirmed_slots,
&mut vote_simulator.heaviest_subtree_fork_choice,
|| progress.is_dead(2).unwrap_or(false),
|| Some(bank2_hash),
);
check_slot_agrees_with_cluster(
2,
bank_forks.read().unwrap().root(),
Some(bank2_hash),
&blockstore,
&mut duplicate_slots_tracker,
&gossip_duplicate_confirmed_slots,
&progress,
&mut vote_simulator.heaviest_subtree_fork_choice,
&mut DuplicateSlotsToRepair::default(),
SlotStateUpdate::Duplicate,
SlotStateUpdate::Duplicate(duplicate_state),
);
let (vote_fork, reset_fork) = run_compute_and_select_forks(
@ -4840,16 +4880,19 @@ mod tests {
// then slot 4 is now the heaviest bank again
let mut duplicate_slots_to_repair = HashSet::new();
gossip_duplicate_confirmed_slots.insert(4, bank4_hash);
let duplicate_confirmed_state = DuplicateConfirmedState::new_from_state(
bank4_hash,
|| progress.is_dead(4).unwrap_or(false),
|| Some(bank4_hash),
);
check_slot_agrees_with_cluster(
4,
bank_forks.read().unwrap().root(),
Some(bank4_hash),
&blockstore,
&mut duplicate_slots_tracker,
&gossip_duplicate_confirmed_slots,
&progress,
&mut vote_simulator.heaviest_subtree_fork_choice,
&mut duplicate_slots_to_repair,
SlotStateUpdate::DuplicateConfirmed,
SlotStateUpdate::DuplicateConfirmed(duplicate_confirmed_state),
);
// The confirmed hash is detected in `progress`, which means
// it's confirmation on the replayed block. This means we have
@ -4980,16 +5023,19 @@ mod tests {
// Mark fork choice branch as invalid so select forks below doesn't panic
// on a nonexistent `heaviest_bank_on_same_fork` after we dump the duplciate fork.
let duplicate_confirmed_state = DuplicateConfirmedState::new_from_state(
duplicate_confirmed_bank2_hash,
|| progress.is_dead(2).unwrap_or(false),
|| Some(our_bank2_hash),
);
check_slot_agrees_with_cluster(
2,
bank_forks.read().unwrap().root(),
Some(our_bank2_hash),
blockstore,
&mut duplicate_slots_tracker,
&gossip_duplicate_confirmed_slots,
progress,
heaviest_subtree_fork_choice,
&mut duplicate_slots_to_repair,
SlotStateUpdate::DuplicateConfirmed,
SlotStateUpdate::DuplicateConfirmed(duplicate_confirmed_state),
);
assert!(duplicate_slots_to_repair.contains(&(2, duplicate_confirmed_bank2_hash)));
let mut ancestors = bank_forks.read().unwrap().ancestors();
@ -5613,7 +5659,7 @@ mod tests {
type GenerateVotes = Box<dyn Fn(Vec<Pubkey>) -> HashMap<Pubkey, Vec<Slot>>>;
fn setup_forks_from_tree(
pub fn setup_forks_from_tree(
tree: Tree<Slot>,
num_keys: usize,
generate_votes: Option<GenerateVotes>,

View File

@ -104,6 +104,9 @@ impl VoteSimulator {
.any(|lockout| lockout.slot == parent));
}
}
while new_bank.tick_height() < new_bank.max_tick_height() {
new_bank.register_tick(&Hash::new_unique());
}
new_bank.freeze();
self.progress
.get_fork_stats_mut(new_bank.slot())
@ -324,7 +327,7 @@ pub fn initialize_state(
) -> (BankForks, ProgressMap, HeaviestSubtreeForkChoice) {
let validator_keypairs: Vec<_> = validator_keypairs_map.values().collect();
let GenesisConfigInfo {
genesis_config,
mut genesis_config,
mint_keypair,
voting_keypair: _,
} = create_genesis_config_with_vote_accounts(
@ -333,12 +336,16 @@ pub fn initialize_state(
vec![stake; validator_keypairs.len()],
);
genesis_config.poh_config.hashes_per_tick = Some(2);
let bank0 = Bank::new(&genesis_config);
for pubkey in validator_keypairs_map.keys() {
bank0.transfer(10_000, &mint_keypair, pubkey).unwrap();
}
while bank0.tick_height() < bank0.max_tick_height() {
bank0.register_tick(&Hash::new_unique());
}
bank0.freeze();
let mut progress = ProgressMap::default();
progress.insert(