diff --git a/core/src/consensus.rs b/core/src/consensus.rs index 9223b4944..0e204dbe5 100644 --- a/core/src/consensus.rs +++ b/core/src/consensus.rs @@ -293,6 +293,7 @@ impl Tower { bank_forks.frozen_banks().values().cloned().collect(), node_pubkey, vote_account, + vec![], ); let root = root_bank.slot(); diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 58e57cacf..55e9b7ad2 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -549,6 +549,7 @@ impl ReplayStage { &bank_forks, &my_pubkey, &vote_account, + &blockstore, ); let mut current_leader = None; let mut last_reset = Hash::default(); @@ -1230,16 +1231,29 @@ impl ReplayStage { bank_forks: &RwLock, my_pubkey: &Pubkey, vote_account: &Pubkey, + blockstore: &Blockstore, ) -> (ProgressMap, HeaviestSubtreeForkChoice) { - let (root_bank, frozen_banks) = { + let (root_bank, frozen_banks, duplicate_slot_hashes) = { let bank_forks = bank_forks.read().unwrap(); + let duplicate_slots = blockstore + .duplicate_slots_iterator(bank_forks.root_bank().slot()) + .unwrap(); + let duplicate_slot_hashes = duplicate_slots + .filter_map(|slot| bank_forks.bank_hash(slot).map(|hash| (slot, hash))); ( bank_forks.root_bank(), bank_forks.frozen_banks().values().cloned().collect(), + duplicate_slot_hashes.collect::>(), ) }; - Self::initialize_progress_and_fork_choice(&root_bank, frozen_banks, my_pubkey, vote_account) + Self::initialize_progress_and_fork_choice( + &root_bank, + frozen_banks, + my_pubkey, + vote_account, + duplicate_slot_hashes, + ) } pub fn initialize_progress_and_fork_choice( @@ -1247,6 +1261,7 @@ impl ReplayStage { mut frozen_banks: Vec>, my_pubkey: &Pubkey, vote_account: &Pubkey, + duplicate_slot_hashes: Vec<(Slot, Hash)>, ) -> (ProgressMap, HeaviestSubtreeForkChoice) { let mut progress = ProgressMap::default(); @@ -1261,11 +1276,15 @@ impl ReplayStage { ); } let root = root_bank.slot(); - let heaviest_subtree_fork_choice = HeaviestSubtreeForkChoice::new_from_frozen_banks( + let mut heaviest_subtree_fork_choice = HeaviestSubtreeForkChoice::new_from_frozen_banks( (root, root_bank.hash()), &frozen_banks, ); + for slot_hash in duplicate_slot_hashes { + heaviest_subtree_fork_choice.mark_fork_invalid_candidate(&slot_hash); + } + (progress, heaviest_subtree_fork_choice) } @@ -2086,6 +2105,30 @@ impl ReplayStage { purge_repair_slot_counter, SlotStateUpdate::Dead(dead_state), ); + + // If we previously marked this slot as duplicate in blockstore, let the state machine know + if !duplicate_slots_tracker.contains(&slot) && blockstore.get_duplicate_slot(slot).is_some() + { + let duplicate_state = DuplicateState::new_from_state( + slot, + gossip_duplicate_confirmed_slots, + heaviest_subtree_fork_choice, + || true, + || None, + ); + check_slot_agrees_with_cluster( + slot, + root, + blockstore, + duplicate_slots_tracker, + epoch_slots_frozen_slots, + heaviest_subtree_fork_choice, + duplicate_slots_to_repair, + ancestor_hashes_replay_update_sender, + purge_repair_slot_counter, + SlotStateUpdate::Duplicate(duplicate_state), + ); + } } #[allow(clippy::too_many_arguments)] @@ -2827,6 +2870,30 @@ impl ReplayStage { purge_repair_slot_counter, SlotStateUpdate::BankFrozen(bank_frozen_state), ); + // If we previously marked this slot as duplicate in blockstore, let the state machine know + if !duplicate_slots_tracker.contains(&bank.slot()) + && blockstore.get_duplicate_slot(bank.slot()).is_some() + { + let duplicate_state = DuplicateState::new_from_state( + bank.slot(), + gossip_duplicate_confirmed_slots, + heaviest_subtree_fork_choice, + || false, + || Some(bank.hash()), + ); + check_slot_agrees_with_cluster( + bank.slot(), + bank_forks.read().unwrap().root(), + blockstore, + duplicate_slots_tracker, + epoch_slots_frozen_slots, + heaviest_subtree_fork_choice, + duplicate_slots_to_repair, + ancestor_hashes_replay_update_sender, + purge_repair_slot_counter, + SlotStateUpdate::Duplicate(duplicate_state), + ); + } if let Some(sender) = bank_notification_sender { sender .sender diff --git a/local-cluster/src/integration_tests.rs b/local-cluster/src/integration_tests.rs index e7691bc1c..41e803799 100644 --- a/local-cluster/src/integration_tests.rs +++ b/local-cluster/src/integration_tests.rs @@ -26,6 +26,7 @@ use { solana_ledger::{ ancestor_iterator::AncestorIterator, blockstore::{Blockstore, PurgeType}, + blockstore_meta::DuplicateSlotProof, blockstore_options::{AccessType, BlockstoreOptions}, leader_schedule::{FixedSchedule, LeaderSchedule}, }, @@ -153,6 +154,23 @@ pub fn wait_for_last_vote_in_tower_to_land_in_ledger( }) } +/// Waits roughly 10 seconds for duplicate proof to appear in blockstore at `dup_slot`. Returns proof if found. +pub fn wait_for_duplicate_proof(ledger_path: &Path, dup_slot: Slot) -> Option { + for _ in 0..10 { + let duplicate_fork_validator_blockstore = open_blockstore(ledger_path); + if let Some((found_dup_slot, found_duplicate_proof)) = + duplicate_fork_validator_blockstore.get_first_duplicate_proof() + { + if found_dup_slot == dup_slot { + return Some(found_duplicate_proof); + }; + } + + sleep(Duration::from_millis(1000)); + } + None +} + pub fn copy_blocks(end_slot: Slot, source: &Blockstore, dest: &Blockstore) { for slot in std::iter::once(end_slot).chain(AncestorIterator::new(end_slot, source)) { let source_meta = source.meta(slot).unwrap().unwrap(); diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index 118332ed3..2280bd98f 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -24,10 +24,9 @@ use { ancestor_iterator::AncestorIterator, bank_forks_utils, blockstore::{entries_to_test_shreds, Blockstore}, - blockstore_meta::DuplicateSlotProof, blockstore_processor::ProcessOptions, leader_schedule::FixedSchedule, - shred::Shred, + shred::{ProcessShredsStats, ReedSolomonCache, Shred, Shredder}, use_snapshot_archives_at_startup::UseSnapshotArchivesAtStartup, }, solana_local_cluster::{ @@ -39,7 +38,7 @@ use { last_root_in_tower, last_vote_in_tower, ms_for_n_slots, open_blockstore, purge_slots_with_count, remove_tower, remove_tower_if_exists, restore_tower, run_cluster_partition, run_kill_partition_switch_threshold, save_tower, - setup_snapshot_validator_config, test_faulty_node, + setup_snapshot_validator_config, test_faulty_node, wait_for_duplicate_proof, wait_for_last_vote_in_tower_to_land_in_ledger, SnapshotValidatorConfig, ValidatorTestConfig, DEFAULT_CLUSTER_LAMPORTS, DEFAULT_NODE_STAKE, RUST_LOG_FILTER, }, @@ -69,7 +68,7 @@ use { client::{AsyncClient, SyncClient}, clock::{self, Slot, DEFAULT_TICKS_PER_SLOT, MAX_PROCESSING_AGE}, commitment_config::CommitmentConfig, - epoch_schedule::MINIMUM_SLOTS_PER_EPOCH, + epoch_schedule::{DEFAULT_SLOTS_PER_EPOCH, MINIMUM_SLOTS_PER_EPOCH}, genesis_config::ClusterType, hard_forks::HardForks, hash::Hash, @@ -5145,22 +5144,6 @@ fn test_duplicate_shreds_switch_failure() { } } - fn wait_for_duplicate_proof(ledger_path: &Path, dup_slot: Slot) -> Option { - for _ in 0..10 { - let duplicate_fork_validator_blockstore = open_blockstore(ledger_path); - if let Some((found_dup_slot, found_duplicate_proof)) = - duplicate_fork_validator_blockstore.get_first_duplicate_proof() - { - if found_dup_slot == dup_slot { - return Some(found_duplicate_proof); - }; - } - - sleep(Duration::from_millis(1000)); - } - None - } - solana_logger::setup_with_default(RUST_LOG_FILTER); let validator_keypairs = [ "28bN3xyvrP4E8LwEgtLjhnkb7cY4amQb6DrYAbAYjgRV4GAGgkVM2K7wnxnAS7WDneuavza7x21MiafLu1HkwQt4", @@ -5506,3 +5489,156 @@ fn test_duplicate_shreds_switch_failure() { SocketAddrSpace::Unspecified, ); } + +/// Forks previous marked invalid should be marked as such in fork choice on restart +#[test] +#[serial] +fn test_invalid_forks_persisted_on_restart() { + solana_logger::setup_with("info,solana_metrics=off,solana_ledger=off"); + + let dup_slot = 10; + let validator_keypairs = [ + "28bN3xyvrP4E8LwEgtLjhnkb7cY4amQb6DrYAbAYjgRV4GAGgkVM2K7wnxnAS7WDneuavza7x21MiafLu1HkwQt4", + "2saHBBoTkLMmttmPQP8KfBkcCw45S5cwtV3wTdGCscRC8uxdgvHxpHiWXKx4LvJjNJtnNcbSv5NdheokFFqnNDt8", + ] + .iter() + .map(|s| (Arc::new(Keypair::from_base58_string(s)), true)) + .collect::>(); + let majority_keypair = validator_keypairs[1].0.clone(); + + let validators = validator_keypairs + .iter() + .map(|(kp, _)| kp.pubkey()) + .collect::>(); + + let node_stakes = vec![DEFAULT_NODE_STAKE, 100 * DEFAULT_NODE_STAKE]; + let (target_pubkey, majority_pubkey) = (validators[0], validators[1]); + // Need majority validator to make the dup_slot + let validator_to_slots = vec![ + (majority_pubkey, dup_slot as usize + 5), + (target_pubkey, DEFAULT_SLOTS_PER_EPOCH as usize), + ]; + let leader_schedule = create_custom_leader_schedule(validator_to_slots.into_iter()); + let mut default_config = ValidatorConfig::default_for_test(); + default_config.fixed_leader_schedule = Some(FixedSchedule { + leader_schedule: Arc::new(leader_schedule), + }); + let mut validator_configs = make_identical_validator_configs(&default_config, 2); + // Majority shouldn't duplicate confirm anything + validator_configs[1].voting_disabled = true; + + let mut cluster = LocalCluster::new( + &mut ClusterConfig { + cluster_lamports: DEFAULT_CLUSTER_LAMPORTS + node_stakes.iter().sum::(), + validator_configs, + node_stakes, + validator_keys: Some(validator_keypairs), + skip_warmup_slots: true, + ..ClusterConfig::default() + }, + SocketAddrSpace::Unspecified, + ); + + let target_ledger_path = cluster.ledger_path(&target_pubkey); + + // Wait for us to vote past duplicate slot + let timer = Instant::now(); + loop { + if let Some(slot) = + wait_for_last_vote_in_tower_to_land_in_ledger(&target_ledger_path, &target_pubkey) + { + if slot > dup_slot { + break; + } + } + + assert!( + timer.elapsed() < Duration::from_secs(30), + "Did not make more than 10 blocks in 30 seconds" + ); + sleep(Duration::from_millis(100)); + } + + // Send duplicate + let parent = { + let blockstore = open_blockstore(&target_ledger_path); + let parent = blockstore + .meta(dup_slot) + .unwrap() + .unwrap() + .parent_slot + .unwrap(); + + let entries = create_ticks( + 64 * (std::cmp::max(1, dup_slot - parent)), + 0, + cluster.genesis_config.hash(), + ); + let last_hash = entries.last().unwrap().hash; + let version = solana_sdk::shred_version::version_from_hash(&last_hash); + let dup_shreds = Shredder::new(dup_slot, parent, 0, version) + .unwrap() + .entries_to_shreds( + &majority_keypair, + &entries, + true, // is_full_slot + 0, // next_shred_index, + 0, // next_code_index + false, // merkle_variant + &ReedSolomonCache::default(), + &mut ProcessShredsStats::default(), + ) + .0; + + info!("Sending duplicate shreds for {dup_slot}"); + cluster.send_shreds_to_validator(dup_shreds.iter().collect(), &target_pubkey); + wait_for_duplicate_proof(&target_ledger_path, dup_slot) + .expect("Duplicate proof for {dup_slot} not found"); + parent + }; + + info!("Duplicate proof for {dup_slot} has landed, restarting node"); + let info = cluster.exit_node(&target_pubkey); + + { + let blockstore = open_blockstore(&target_ledger_path); + purge_slots_with_count(&blockstore, dup_slot + 5, 100); + } + + // Restart, should create an entirely new fork + cluster.restart_node(&target_pubkey, info, SocketAddrSpace::Unspecified); + + info!("Waiting for fork built off {parent}"); + let timer = Instant::now(); + let mut checked_children: HashSet = HashSet::default(); + let mut done = false; + while !done { + let blockstore = open_blockstore(&target_ledger_path); + let parent_meta = blockstore.meta(parent).unwrap().expect("Meta must exist"); + for child in parent_meta.next_slots { + if checked_children.contains(&child) { + continue; + } + + if blockstore.is_full(child) { + let shreds = blockstore + .get_data_shreds_for_slot(child, 0) + .expect("Child is full"); + let mut is_our_block = true; + for shred in shreds { + is_our_block &= shred.verify(&target_pubkey); + } + if is_our_block { + done = true; + } + checked_children.insert(child); + } + } + + assert!( + timer.elapsed() < Duration::from_secs(30), + "Did not create a new fork off parent {parent} in 30 seconds after restart" + ); + sleep(Duration::from_millis(100)); + } +}