From f7adb685990ffe0de258373e5ba25127b5e463d9 Mon Sep 17 00:00:00 2001 From: carllin Date: Thu, 20 Aug 2020 21:56:25 -0700 Subject: [PATCH] Squash supermajority root on blockstore replay at startup (#11727) --- ledger/src/blockstore_processor.rs | 265 +++++++++++++++++++++++++++-- 1 file changed, 253 insertions(+), 12 deletions(-) diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index f982d03f6b..92da01c57a 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -18,6 +18,7 @@ use solana_runtime::{ bank::{Bank, TransactionBalancesSet, TransactionProcessResult, TransactionResults}, bank_forks::BankForks, bank_utils, + commitment::VOTE_THRESHOLD_SIZE, transaction_batch::TransactionBatch, transaction_utils::OrderedIterator, vote_sender_types::ReplayVoteSender, @@ -31,9 +32,10 @@ use solana_sdk::{ timing::duration_as_ms, transaction::{Result, Transaction, TransactionError}, }; +use solana_vote_program::vote_state::VoteState; use std::{ cell::RefCell, - collections::HashMap, + collections::{BTreeMap, HashMap}, path::PathBuf, result, sync::Arc, @@ -454,6 +456,7 @@ pub fn verify_ticks( ) -> std::result::Result<(), BlockError> { let next_bank_tick_height = bank.tick_height() + entries.tick_count(); let max_bank_tick_height = bank.max_tick_height(); + if next_bank_tick_height > max_bank_tick_height { warn!("Too many entry ticks found in slot: {}", bank.slot()); return Err(BlockError::InvalidTickCount); @@ -757,11 +760,18 @@ fn load_frozen_forks( transaction_status_sender: Option, ) -> result::Result>, BlockstoreProcessorError> { let mut initial_forks = HashMap::new(); + let mut all_banks = HashMap::new(); let mut last_status_report = Instant::now(); let mut pending_slots = vec![]; let mut last_root_slot = root_bank.slot(); let mut slots_elapsed = 0; let mut txs = 0; + let blockstore_max_root = blockstore.max_root(); + let max_root = std::cmp::max(root_bank.slot(), blockstore_max_root); + info!( + "load_frozen_forks() latest root from blockstore: {}, max_root: {}", + blockstore_max_root, max_root, + ); process_next_slots( root_bank, root_meta, @@ -810,14 +820,49 @@ fn load_frozen_forks( } txs += progress.num_txs; - if blockstore.is_root(slot) { - *root = slot; - leader_schedule_cache.set_root(&bank); - bank.squash(); - pending_slots.clear(); - initial_forks.clear(); - last_root_slot = slot; + // Block must be frozen by this point, otherwise `process_single_slot` would + // have errored above + assert!(bank.is_frozen()); + all_banks.insert(bank.slot(), bank.clone()); + + // If we've reached the last known root in blockstore, start looking + // for newer cluster confirmed roots + let new_root_bank = { + if *root == max_root { + supermajority_root_from_bank(&bank).and_then(|supermajority_root| { + if supermajority_root > *root { + // If there's a cluster confirmed root greater than our last + // replayed root, then beccause the cluster confirmed root should + // be descended from our last root, it must exist in `all_banks` + let cluster_root_bank = all_banks.get(&supermajority_root).unwrap(); + + // cluster root must be a descendant of our root, otherwise something + // is drastically wrong + assert!(cluster_root_bank.ancestors.contains_key(root)); + info!("blockstore processor found new cluster confirmed root: {}, observed in bank: {}", cluster_root_bank.slot(), bank.slot()); + Some(cluster_root_bank) + } else { + None + } + }) + } else if blockstore.is_root(slot) { + Some(&bank) + } else { + None + } + }; + + if let Some(new_root_bank) = new_root_bank { + *root = new_root_bank.slot(); + last_root_slot = new_root_bank.slot(); + leader_schedule_cache.set_root(&new_root_bank); + new_root_bank.squash(); + + // Filter out all non descendants of the new root + pending_slots.retain(|(_, pending_bank, _)| pending_bank.ancestors.contains_key(root)); + initial_forks.retain(|_, fork_tip_bank| fork_tip_bank.ancestors.contains_key(root)); } + slots_elapsed += 1; trace!( @@ -844,6 +889,51 @@ fn load_frozen_forks( Ok(initial_forks.values().cloned().collect::>()) } +fn supermajority_root(roots: &BTreeMap, total_epoch_stake: u64) -> Option { + // Find latest root + let mut total = 0; + for (root, stake) in roots.iter().rev() { + total += stake; + if total as f64 / total_epoch_stake as f64 > VOTE_THRESHOLD_SIZE { + return Some(*root); + } + } + + None +} + +fn supermajority_root_from_bank(bank: &Bank) -> Option { + let roots: BTreeMap = bank + .vote_accounts() + .into_iter() + .filter_map(|(key, (stake, account))| { + if stake == 0 { + return None; + } + + let vote_state = VoteState::from(&account); + if vote_state.is_none() { + warn!( + "Unable to get vote_state from account {} in bank: {}", + key, + bank.slot() + ); + return None; + } + + vote_state + .unwrap() + .root_slot + .map(|root_slot| (root_slot, stake)) + }) + .collect(); + + let total_epoch_stake = bank.total_epoch_stake(); + + // Find latest root + supermajority_root(&roots, total_epoch_stake) +} + // Processes and replays the contents of a single slot, returns Error // if failed to play the slot fn process_single_slot( @@ -871,6 +961,7 @@ fn process_single_slot( })?; bank.freeze(); // all banks handled by this routine are created from complete slots + Ok(()) } @@ -951,7 +1042,7 @@ pub mod tests { use matches::assert_matches; use rand::{thread_rng, Rng}; use solana_runtime::genesis_utils::{ - create_genesis_config_with_vote_accounts, ValidatorVoteKeypairs, + self, create_genesis_config_with_vote_accounts, ValidatorVoteKeypairs, }; use solana_sdk::account::Account; use solana_sdk::{ @@ -963,8 +1054,9 @@ pub mod tests { system_transaction, transaction::{Transaction, TransactionError}, }; - use solana_vote_program::vote_transaction; + use solana_vote_program::{vote_state::MAX_LOCKOUT_HISTORY, vote_transaction}; use std::{collections::BTreeSet, sync::RwLock}; + use trees::tr; #[test] fn test_process_blockstore_with_missing_hashes() { @@ -1535,11 +1627,11 @@ pub mod tests { let blockstore = Blockstore::open(&ledger_path).expect("Expected to successfully open database ledger"); - // Let last_slot be the number of slots in the first two epochs + // Let `last_slot` be the number of slots in the first two epochs let epoch_schedule = get_epoch_schedule(&genesis_config, Vec::new()); let last_slot = epoch_schedule.get_last_slot_in_epoch(1); - // Create a single chain of slots with all indexes in the range [0, last_slot + 1] + // Create a single chain of slots with all indexes in the range [0, v + 1] for i in 1..=last_slot + 1 { last_entry_hash = fill_blockstore_slot_with_ticks( &blockstore, @@ -2824,4 +2916,153 @@ pub mod tests { .collect(); assert_eq!(successes, expected_successful_voter_pubkeys); } + + fn make_slot_with_vote_tx( + blockstore: &Blockstore, + ticks_per_slot: u64, + tx_landed_slot: Slot, + parent_slot: Slot, + parent_blockhash: &Hash, + vote_tx: Transaction, + slot_leader_keypair: &Arc, + ) { + // Add votes to `last_slot` so that `root` will be confirmed + let vote_entry = next_entry(&parent_blockhash, 1, vec![vote_tx]); + let mut entries = create_ticks(ticks_per_slot, 0, vote_entry.hash); + entries.insert(0, vote_entry); + blockstore + .write_entries( + tx_landed_slot, + 0, + 0, + ticks_per_slot, + Some(parent_slot), + true, + &slot_leader_keypair, + entries, + 0, + ) + .unwrap(); + } + + fn run_test_process_blockstore_with_supermajority_root(blockstore_root: Option) { + solana_logger::setup(); + /* + Build fork structure: + slot 0 + | + slot 1 <- (blockstore root) + / \ + slot 2 | + | | + slot 4 | + slot 5 + | + `expected_root_slot` + / \ + ... minor fork + / + `last_slot` + */ + let starting_fork_slot = 5; + let mut main_fork = tr(starting_fork_slot); + let mut main_fork_ref = main_fork.root_mut(); + + // Make enough slots to make a root slot > blockstore_root + let expected_root_slot = starting_fork_slot + blockstore_root.unwrap_or(0); + let last_main_fork_slot = expected_root_slot + MAX_LOCKOUT_HISTORY as u64 + 1; + + // Make `minor_fork` + let last_minor_fork_slot = last_main_fork_slot + 1; + let minor_fork = tr(last_minor_fork_slot); + + // Make 'main_fork` + for slot in starting_fork_slot + 1..last_main_fork_slot { + if slot - 1 == expected_root_slot { + main_fork_ref.push_front(minor_fork.clone()); + } + main_fork_ref.push_front(tr(slot)); + main_fork_ref = main_fork_ref.first_mut().unwrap(); + } + let forks = tr(0) / (tr(1) / (tr(2) / (tr(4))) / main_fork); + let validator_keypairs = ValidatorVoteKeypairs::new_rand(); + let GenesisConfigInfo { genesis_config, .. } = + genesis_utils::create_genesis_config_with_vote_accounts( + 10_000, + &[&validator_keypairs], + vec![100], + ); + let ticks_per_slot = genesis_config.ticks_per_slot(); + let ledger_path = get_tmp_ledger_path!(); + let blockstore = Blockstore::open(&ledger_path).unwrap(); + blockstore.add_tree(forks, false, true, ticks_per_slot, genesis_config.hash()); + + if let Some(blockstore_root) = blockstore_root { + blockstore.set_roots(&[blockstore_root]).unwrap(); + } + + let opts = ProcessOptions { + poh_verify: true, + ..ProcessOptions::default() + }; + let (bank_forks, _leader_schedule) = + process_blockstore(&genesis_config, &blockstore, Vec::new(), opts.clone()).unwrap(); + + let last_vote_bank_hash = bank_forks.get(last_main_fork_slot - 1).unwrap().hash(); + let last_vote_blockhash = bank_forks + .get(last_main_fork_slot - 1) + .unwrap() + .last_blockhash(); + let slots: Vec<_> = (expected_root_slot..last_main_fork_slot).collect(); + let vote_tx = vote_transaction::new_vote_transaction( + slots, + last_vote_bank_hash, + last_vote_blockhash, + &validator_keypairs.node_keypair, + &validator_keypairs.vote_keypair, + &validator_keypairs.vote_keypair, + None, + ); + + // Add votes to `last_slot` so that `root` will be confirmed + let leader_keypair = Arc::new(validator_keypairs.node_keypair); + make_slot_with_vote_tx( + &blockstore, + ticks_per_slot, + last_main_fork_slot, + last_main_fork_slot - 1, + &last_vote_blockhash, + vote_tx, + &leader_keypair, + ); + + let (bank_forks, _leader_schedule) = + process_blockstore(&genesis_config, &blockstore, Vec::new(), opts).unwrap(); + + assert_eq!(bank_forks.root(), expected_root_slot); + assert_eq!( + bank_forks.frozen_banks().len() as u64, + last_minor_fork_slot - expected_root_slot + 1 + ); + + // Minor fork at `last_main_fork_slot + 1` was above the `expected_root_slot` + // so should not have been purged + // + // Fork at slot 2 was purged because it was below the `expected_root_slot` + for slot in 0..=last_minor_fork_slot { + if slot >= expected_root_slot { + let bank = bank_forks.get(slot).unwrap(); + assert_eq!(bank.slot(), slot); + assert!(bank.is_frozen()); + } else { + assert!(bank_forks.get(slot).is_none()); + } + } + } + + #[test] + fn test_process_blockstore_with_supermajority_root() { + run_test_process_blockstore_with_supermajority_root(None); + run_test_process_blockstore_with_supermajority_root(Some(1)) + } }