diff --git a/core/src/consensus.rs b/core/src/consensus.rs index 16b7c3d20..b6458e4d3 100644 --- a/core/src/consensus.rs +++ b/core/src/consensus.rs @@ -1072,7 +1072,7 @@ impl Tower { if let Some(last_voted_slot) = self.last_voted_slot() { if tower_root <= replayed_root { // Normally, we goes into this clause with possible help of - // reconcile_blockstore_roots_with_tower() + // reconcile_blockstore_roots_with_external_source() if slot_history.check(last_voted_slot) == Check::TooOld { // We could try hard to anchor with other older votes, but opt to simplify the // following logic @@ -1320,45 +1320,77 @@ impl TowerError { } } +#[derive(Debug)] +pub enum ExternalRootSource { + Tower(Slot), + HardFork(Slot), +} + +impl ExternalRootSource { + fn root(&self) -> Slot { + match self { + ExternalRootSource::Tower(slot) => *slot, + ExternalRootSource::HardFork(slot) => *slot, + } + } +} + // Given an untimely crash, tower may have roots that are not reflected in blockstore, // or the reverse of this. // That's because we don't impose any ordering guarantee or any kind of write barriers // between tower (plain old POSIX fs calls) and blockstore (through RocksDB), when // `ReplayState::handle_votable_bank()` saves tower before setting blockstore roots. -pub fn reconcile_blockstore_roots_with_tower( - tower: &Tower, +pub fn reconcile_blockstore_roots_with_external_source( + external_source: ExternalRootSource, blockstore: &Blockstore, + // blockstore.last_root() might have been updated already. + // so take a &mut param both to input (and output iff we update root) + last_blockstore_root: &mut Slot, ) -> blockstore_db::Result<()> { - let tower_root = tower.root(); - let last_blockstore_root = blockstore.last_root(); - if last_blockstore_root < tower_root { - // Ensure tower_root itself to exist and be marked as rooted in the blockstore + let external_root = external_source.root(); + if *last_blockstore_root < external_root { + // Ensure external_root itself to exist and be marked as rooted in the blockstore // in addition to its ancestors. - let new_roots: Vec<_> = AncestorIterator::new_inclusive(tower_root, blockstore) - .take_while(|current| match current.cmp(&last_blockstore_root) { + let new_roots: Vec<_> = AncestorIterator::new_inclusive(external_root, blockstore) + .take_while(|current| match current.cmp(last_blockstore_root) { Ordering::Greater => true, Ordering::Equal => false, Ordering::Less => panic!( - "couldn't find a last_blockstore_root upwards from: {}!?", - tower_root + "last_blockstore_root({}) is skipped while traversing \ + blockstore (currently at {}) from external root ({:?})!?", + last_blockstore_root, current, external_source, ), }) .collect(); if !new_roots.is_empty() { info!( - "Reconciling slots as root based on tower root: {:?} ({}..{}) ", - new_roots, tower_root, last_blockstore_root + "Reconciling slots as root based on external root: {:?} (external: {:?}, blockstore: {})", + new_roots, external_source, last_blockstore_root ); - blockstore.set_roots(new_roots.iter())?; + + // Unfortunately, we can't supply duplicate-confirmed hashes, + // because it can't be guaranteed to be able to replay these slots + // under this code-path's limited condition (i.e. those shreds + // might not be available, etc...) also correctly overcoming this + // limitation is hard... + blockstore.mark_slots_as_if_rooted_normally_at_startup( + new_roots.into_iter().map(|root| (root, None)).collect(), + false, + )?; + + // Update the caller-managed state of last root in blockstore. + // Repeated calls of this function should result in a no-op for + // the range of `new_roots`. + *last_blockstore_root = blockstore.last_root(); } else { // This indicates we're in bad state; but still don't panic here. // That's because we might have a chance of recovering properly with // newer snapshot. warn!( - "Couldn't find any ancestor slots from tower root ({}) \ + "Couldn't find any ancestor slots from external source ({:?}) \ towards blockstore root ({}); blockstore pruned or only \ - tower moved into new ledger?", - tower_root, last_blockstore_root, + tower moved into new ledger or just hard fork?", + external_source, last_blockstore_root, ); } } @@ -2814,7 +2846,12 @@ pub mod test { let mut tower = Tower::default(); tower.vote_state.root_slot = Some(4); - reconcile_blockstore_roots_with_tower(&tower, &blockstore).unwrap(); + reconcile_blockstore_roots_with_external_source( + ExternalRootSource::Tower(tower.root()), + &blockstore, + &mut blockstore.last_root(), + ) + .unwrap(); assert!(!blockstore.is_root(0)); assert!(blockstore.is_root(1)); @@ -2825,7 +2862,9 @@ pub mod test { } #[test] - #[should_panic(expected = "couldn't find a last_blockstore_root upwards from: 4!?")] + #[should_panic(expected = "last_blockstore_root(3) is skipped while \ + traversing blockstore (currently at 1) from \ + external root (Tower(4))!?")] fn test_reconcile_blockstore_roots_with_tower_panic_no_common_root() { solana_logger::setup(); let blockstore_path = get_tmp_ledger_path!(); @@ -2846,7 +2885,12 @@ pub mod test { let mut tower = Tower::default(); tower.vote_state.root_slot = Some(4); - reconcile_blockstore_roots_with_tower(&tower, &blockstore).unwrap(); + reconcile_blockstore_roots_with_external_source( + ExternalRootSource::Tower(tower.root()), + &blockstore, + &mut blockstore.last_root(), + ) + .unwrap(); } Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); } @@ -2869,7 +2913,12 @@ pub mod test { let mut tower = Tower::default(); tower.vote_state.root_slot = Some(4); assert_eq!(blockstore.last_root(), 0); - reconcile_blockstore_roots_with_tower(&tower, &blockstore).unwrap(); + reconcile_blockstore_roots_with_external_source( + ExternalRootSource::Tower(tower.root()), + &blockstore, + &mut blockstore.last_root(), + ) + .unwrap(); assert_eq!(blockstore.last_root(), 0); } Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 1e2b4be3e..268695613 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -23,6 +23,7 @@ use { rewards_recorder_service::RewardsRecorderSender, tower_storage::{SavedTower, SavedTowerVersions, TowerStorage}, unfrozen_gossip_verified_vote_hashes::UnfrozenGossipVerifiedVoteHashes, + validator::ProcessBlockStore, voting_service::VoteOp, window_service::DuplicateSlotReceiver, }, @@ -352,7 +353,7 @@ pub struct ReplayStage { impl ReplayStage { #[allow(clippy::new_ret_no_self, clippy::too_many_arguments)] - pub fn new + Sized>( + pub fn new( config: ReplayStageConfig, blockstore: Arc, bank_forks: Arc>, @@ -360,7 +361,7 @@ impl ReplayStage { ledger_signal_receiver: Receiver, duplicate_slots_receiver: DuplicateSlotReceiver, poh_recorder: Arc>, - tower: T, + maybe_process_blockstore: Option, vote_tracker: Arc, cluster_slots: Arc, retransmit_slots_sender: RetransmitSlotsSender, @@ -375,8 +376,14 @@ impl ReplayStage { block_metadata_notifier: Option, transaction_cost_metrics_sender: Option, ) -> Self { - let mut tower = tower.into(); - info!("Tower state: {:?}", tower); + let mut tower = if let Some(process_blockstore) = maybe_process_blockstore { + let tower = process_blockstore.process_to_create_tower(); + info!("Tower state: {:?}", tower); + tower + } else { + warn!("creating default tower...."); + Tower::default() + }; let ReplayStageConfig { vote_account, diff --git a/core/src/tvu.rs b/core/src/tvu.rs index f9b7f5261..88a5b0bbf 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -11,7 +11,6 @@ use { }, cluster_slots::ClusterSlots, completed_data_sets_service::CompletedDataSetsSender, - consensus::Tower, cost_update_service::CostUpdateService, drop_bank_service::DropBankService, ledger_cleanup_service::LedgerCleanupService, @@ -22,6 +21,7 @@ use { sigverify_shreds::ShredSigVerifier, sigverify_stage::SigVerifyStage, tower_storage::TowerStorage, + validator::ProcessBlockStore, voting_service::VotingService, warm_quic_cache_service::WarmQuicCacheService, }, @@ -96,7 +96,7 @@ impl Tvu { /// * `sockets` - fetch, repair, and retransmit sockets /// * `blockstore` - the ledger itself #[allow(clippy::new_ret_no_self, clippy::too_many_arguments)] - pub fn new + Sized>( + pub fn new( vote_account: &Pubkey, authorized_voter_keypairs: Arc>>>, bank_forks: &Arc>, @@ -106,7 +106,7 @@ impl Tvu { ledger_signal_receiver: Receiver, rpc_subscriptions: &Arc, poh_recorder: &Arc>, - tower: T, + maybe_process_block_store: Option, tower_storage: Arc, leader_schedule_cache: &Arc, exit: &Arc, @@ -260,7 +260,7 @@ impl Tvu { ledger_signal_receiver, duplicate_slots_receiver, poh_recorder.clone(), - tower, + maybe_process_block_store, vote_tracker, cluster_slots, retransmit_slots_sender, @@ -384,7 +384,6 @@ pub mod tests { let (completed_data_sets_sender, _completed_data_sets_receiver) = unbounded(); let (_, gossip_confirmed_slots_receiver) = unbounded(); let bank_forks = Arc::new(RwLock::new(bank_forks)); - let tower = Tower::default(); let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let tvu = Tvu::new( &vote_keypair.pubkey(), @@ -410,7 +409,7 @@ pub mod tests { OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), )), &poh_recorder, - tower, + None, Arc::new(crate::tower_storage::FileTowerStorage::default()), &leader_schedule_cache, &exit, diff --git a/core/src/validator.rs b/core/src/validator.rs index 795ce7833..229c66d1b 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -8,7 +8,7 @@ use { cache_block_meta_service::{CacheBlockMetaSender, CacheBlockMetaService}, cluster_info_vote_listener::VoteTracker, completed_data_sets_service::CompletedDataSetsService, - consensus::{reconcile_blockstore_roots_with_tower, Tower}, + consensus::{reconcile_blockstore_roots_with_external_source, ExternalRootSource, Tower}, ledger_metric_report_service::LedgerMetricReportService, poh_timing_report_service::PohTimingReportService, rewards_recorder_service::{RewardsRecorderSender, RewardsRecorderService}, @@ -514,6 +514,7 @@ impl Validator { genesis_config, bank_forks, blockstore, + original_blockstore_root, ledger_signal_receiver, completed_slots_receiver, leader_schedule_cache, @@ -669,6 +670,7 @@ impl Validator { vote_account, &start_progress, &blockstore, + original_blockstore_root, &bank_forks, &leader_schedule_cache, &blockstore_process_options, @@ -945,7 +947,7 @@ impl Validator { ledger_signal_receiver, &rpc_subscriptions, &poh_recorder, - process_blockstore, + Some(process_blockstore), config.tower_storage.clone(), &leader_schedule_cache, &exit, @@ -1235,6 +1237,17 @@ fn check_poh_speed(genesis_config: &GenesisConfig, maybe_hash_samples: Option Option { + // detect cluster restart (hard fork) indirectly via wait_for_supermajority... + if let Some(wait_slot_for_supermajority) = config.wait_for_supermajority { + if wait_slot_for_supermajority == root_slot { + return Some(wait_slot_for_supermajority); + } + } + + None +} + fn post_process_restored_tower( restored_tower: crate::consensus::Result, validator_identity: &Pubkey, @@ -1248,29 +1261,28 @@ fn post_process_restored_tower( .and_then(|tower| { let root_bank = bank_forks.root_bank(); let slot_history = root_bank.get_slot_history(); + // make sure tower isn't corrupted first before the following hard fork check let tower = tower.adjust_lockouts_after_replay(root_bank.slot(), &slot_history); - if let Some(wait_slot_for_supermajority) = config.wait_for_supermajority { - if root_bank.slot() == wait_slot_for_supermajority { - // intentionally fail to restore tower; we're supposedly in a new hard fork; past - // out-of-chain vote state doesn't make sense at all - // what if --wait-for-supermajority again if the validator restarted? - let message = format!("Hardfork is detected; discarding tower restoration result: {:?}", tower); - datapoint_error!( - "tower_error", - ( - "error", - message, - String - ), - ); - error!("{}", message); + if let Some(hard_fork_restart_slot) = maybe_cluster_restart_with_hard_fork(config, root_bank.slot()) { + // intentionally fail to restore tower; we're supposedly in a new hard fork; past + // out-of-chain vote state doesn't make sense at all + // what if --wait-for-supermajority again if the validator restarted? + let message = format!("Hard fork is detected; discarding tower restoration result: {:?}", tower); + datapoint_error!( + "tower_error", + ( + "error", + message, + String + ), + ); + error!("{}", message); - // unconditionally relax tower requirement so that we can always restore tower - // from root bank. - should_require_tower = false; - return Err(crate::consensus::TowerError::HardFork(wait_slot_for_supermajority)); - } + // unconditionally relax tower requirement so that we can always restore tower + // from root bank. + should_require_tower = false; + return Err(crate::consensus::TowerError::HardFork(hard_fork_restart_slot)); } if let Some(warp_slot) = config.warp_slot { @@ -1337,6 +1349,7 @@ fn load_blockstore( GenesisConfig, Arc>, Arc, + Slot, Receiver, CompletedSlotsReceiver, LeaderScheduleCache, @@ -1389,6 +1402,9 @@ fn load_blockstore( .expect("Failed to open ledger database"); blockstore.set_no_compaction(config.no_rocksdb_compaction); blockstore.shred_timing_point_sender = poh_timing_point_sender; + // following boot sequence (esp BankForks) could set root. so stash the original value + // of blockstore root away here as soon as possible. + let original_blockstore_root = blockstore.last_root(); let blockstore = Arc::new(blockstore); let blockstore_root_scan = BlockstoreRootScan::new(config, &blockstore, exit); @@ -1493,6 +1509,7 @@ fn load_blockstore( genesis_config, bank_forks, blockstore, + original_blockstore_root, ledger_signal_receiver, completed_slots_receiver, leader_schedule_cache, @@ -1527,11 +1544,12 @@ fn highest_slot(blockstore: &Blockstore) -> Option { highest_slot } -struct ProcessBlockStore<'a> { +pub struct ProcessBlockStore<'a> { id: &'a Pubkey, vote_account: &'a Pubkey, start_progress: &'a Arc>, blockstore: &'a Blockstore, + original_blockstore_root: Slot, bank_forks: &'a Arc>, leader_schedule_cache: &'a LeaderScheduleCache, process_options: &'a blockstore_processor::ProcessOptions, @@ -1550,6 +1568,7 @@ impl<'a> ProcessBlockStore<'a> { vote_account: &'a Pubkey, start_progress: &'a Arc>, blockstore: &'a Blockstore, + original_blockstore_root: Slot, bank_forks: &'a Arc>, leader_schedule_cache: &'a LeaderScheduleCache, process_options: &'a blockstore_processor::ProcessOptions, @@ -1564,6 +1583,7 @@ impl<'a> ProcessBlockStore<'a> { vote_account, start_progress, blockstore, + original_blockstore_root, bank_forks, leader_schedule_cache, process_options, @@ -1576,7 +1596,7 @@ impl<'a> ProcessBlockStore<'a> { } } - fn process(&mut self) { + pub(crate) fn process(&mut self) { if self.tower.is_none() { let previous_start_process = *self.start_progress.read().unwrap(); *self.start_progress.write().unwrap() = ValidatorStartProgress::LoadingLedger; @@ -1633,12 +1653,16 @@ impl<'a> ProcessBlockStore<'a> { self.tower = Some({ let restored_tower = Tower::restore(self.config.tower_storage.as_ref(), self.id); if let Ok(tower) = &restored_tower { - reconcile_blockstore_roots_with_tower(tower, self.blockstore).unwrap_or_else( - |err| { - error!("Failed to reconcile blockstore with tower: {:?}", err); - abort() - }, - ); + // reconciliation attempt 1 of 2 with tower + reconcile_blockstore_roots_with_external_source( + ExternalRootSource::Tower(tower.root()), + self.blockstore, + &mut self.original_blockstore_root, + ) + .unwrap_or_else(|err| { + error!("Failed to reconcile blockstore with tower: {:?}", err); + abort() + }); } post_process_restored_tower( @@ -1650,15 +1674,30 @@ impl<'a> ProcessBlockStore<'a> { ) }); + if let Some(hard_fork_restart_slot) = maybe_cluster_restart_with_hard_fork( + self.config, + self.bank_forks.read().unwrap().root_bank().slot(), + ) { + // reconciliation attempt 2 of 2 with hard fork + // this should be #2 because hard fork root > tower root in almost all cases + reconcile_blockstore_roots_with_external_source( + ExternalRootSource::HardFork(hard_fork_restart_slot), + self.blockstore, + &mut self.original_blockstore_root, + ) + .unwrap_or_else(|err| { + error!("Failed to reconcile blockstore with hard fork: {:?}", err); + abort() + }); + } + *self.start_progress.write().unwrap() = previous_start_process; } } -} -impl<'a> From> for Tower { - fn from(mut process_blockstore: ProcessBlockStore<'a>) -> Self { - process_blockstore.process(); - process_blockstore.tower.expect("valid tower") + pub(crate) fn process_to_create_tower(mut self) -> Tower { + self.process(); + self.tower.unwrap() } } diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index cce9b3d8c..c8801eae8 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -525,13 +525,28 @@ impl Blockstore { Ok(slot_iterator.take_while(move |((shred_slot, _), _)| *shred_slot == slot)) } - pub fn rooted_slot_iterator(&self, slot: Slot) -> Result + '_> { + fn prepare_rooted_slot_iterator( + &self, + slot: Slot, + direction: IteratorDirection, + ) -> Result + '_> { let slot_iterator = self .db - .iter::(IteratorMode::From(slot, IteratorDirection::Forward))?; + .iter::(IteratorMode::From(slot, direction))?; Ok(slot_iterator.map(move |(rooted_slot, _)| rooted_slot)) } + pub fn rooted_slot_iterator(&self, slot: Slot) -> Result + '_> { + self.prepare_rooted_slot_iterator(slot, IteratorDirection::Forward) + } + + pub fn reversed_rooted_slot_iterator( + &self, + slot: Slot, + ) -> Result + '_> { + self.prepare_rooted_slot_iterator(slot, IteratorDirection::Reverse) + } + /// Determines if starting_slot and ending_slot are connected pub fn slots_connected(&self, starting_slot: Slot, ending_slot: Slot) -> bool { let mut next_slots: VecDeque<_> = vec![starting_slot].into(); @@ -1745,6 +1760,13 @@ impl Blockstore { self.meta_cf.put_bytes(slot, bytes) } + /// Manually update the meta for a slot. + /// Can interfere with automatic meta update and potentially break chaining. + /// Dangerous. Use with care. + pub fn put_meta(&self, slot: Slot, meta: &SlotMeta) -> Result<()> { + self.put_meta_bytes(slot, &bincode::serialize(meta)?) + } + // Given a start and end entry index, find all the missing // indexes in the ledger in the range [start_index, end_index) // for the slot with the specified slot @@ -3043,6 +3065,22 @@ impl Blockstore { Ok(()) } + pub fn mark_slots_as_if_rooted_normally_at_startup( + &self, + slots: Vec<(Slot, Option)>, + with_hash: bool, + ) -> Result<()> { + self.set_roots(slots.iter().map(|(slot, _hash)| slot))?; + if with_hash { + self.set_duplicate_confirmed_slots_and_hashes( + slots + .into_iter() + .map(|(slot, maybe_hash)| (slot, maybe_hash.unwrap())), + )?; + } + Ok(()) + } + pub fn is_dead(&self, slot: Slot) -> bool { matches!( self.db diff --git a/ledger/src/blockstore_meta.rs b/ledger/src/blockstore_meta.rs index 8a0257c7b..4dfbf0203 100644 --- a/ledger/src/blockstore_meta.rs +++ b/ledger/src/blockstore_meta.rs @@ -214,6 +214,11 @@ impl SlotMeta { Some(self.consumed) == self.last_index.map(|ix| ix + 1) } + /// Dangerous. Currently only needed for a local-cluster test + pub fn unset_parent(&mut self) { + self.parent_slot = None; + } + pub fn clear_unconfirmed_slot(&mut self) { let mut new_self = SlotMeta::new_orphan(self.slot); std::mem::swap(&mut new_self.next_slots, &mut self.next_slots); diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 61be72395..72fcc6595 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -746,8 +746,11 @@ pub fn process_blockstore_from_root( // ensure start_slot is rooted for correct replay if blockstore.is_primary_access() { blockstore - .set_roots(std::iter::once(&start_slot)) - .expect("Couldn't set root slot on startup"); + .mark_slots_as_if_rooted_normally_at_startup( + vec![(bank.slot(), Some(bank.hash()))], + true, + ) + .expect("Couldn't mark start_slot as root on startup"); } else { info!( "Starting slot {} isn't root and won't be updated due to being secondary blockstore access", @@ -1361,17 +1364,16 @@ fn load_frozen_forks( if new_root_bank.slot() == root { break; } // Found the last root in the chain, yay! assert!(new_root_bank.slot() > root); - rooted_slots.push((new_root_bank.slot(), new_root_bank.hash())); + rooted_slots.push((new_root_bank.slot(), Some(new_root_bank.hash()))); // As noted, the cluster confirmed root should be descended from // our last root; therefore parent should be set new_root_bank = new_root_bank.parent().unwrap(); } inc_new_counter_info!("load_frozen_forks-cluster-confirmed-root", rooted_slots.len()); if blockstore.is_primary_access() { - blockstore.set_roots(rooted_slots.iter().map(|(slot, _hash)| slot)) - .expect("Blockstore::set_roots should succeed"); - blockstore.set_duplicate_confirmed_slots_and_hashes(rooted_slots.into_iter()) - .expect("Blockstore::set_duplicate_confirmed should succeed"); + blockstore + .mark_slots_as_if_rooted_normally_at_startup(rooted_slots, true) + .expect("Blockstore::mark_slots_as_if_rooted_normally_at_startup() should succeed"); } Some(cluster_root_bank) } else { diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index 8d1a872c1..30192a2d0 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -20,14 +20,17 @@ use { cluster_info::Node, contact_info::ContactInfo, gossip_service::discover_cluster, }, solana_ledger::create_new_tmp_ledger, - solana_runtime::genesis_utils::{ - create_genesis_config_with_vote_accounts_and_cluster_type, GenesisConfigInfo, - ValidatorVoteKeypairs, + solana_runtime::{ + genesis_utils::{ + create_genesis_config_with_vote_accounts_and_cluster_type, GenesisConfigInfo, + ValidatorVoteKeypairs, + }, + snapshot_config::SnapshotConfig, }, solana_sdk::{ account::{Account, AccountSharedData}, client::SyncClient, - clock::{DEFAULT_DEV_SLOTS_PER_EPOCH, DEFAULT_TICKS_PER_SLOT}, + clock::{Slot, DEFAULT_DEV_SLOTS_PER_EPOCH, DEFAULT_TICKS_PER_SLOT}, commitment_config::CommitmentConfig, epoch_schedule::EpochSchedule, genesis_config::{ClusterType, GenesisConfig}, @@ -52,10 +55,13 @@ use { collections::HashMap, io::{Error, ErrorKind, Result}, iter, + path::{Path, PathBuf}, sync::{Arc, RwLock}, }, }; +const DUMMY_SNAPSHOT_CONFIG_PATH_MARKER: &str = "dummy"; + pub struct ClusterConfig { /// The validator config that should be applied to every node in the cluster pub validator_configs: Vec, @@ -138,6 +144,23 @@ impl LocalCluster { Self::new(&mut config, socket_addr_space) } + fn sync_ledger_path_across_nested_config_fields( + config: &mut ValidatorConfig, + ledger_path: &Path, + ) { + config.account_paths = vec![ledger_path.join("accounts")]; + config.tower_storage = Arc::new(FileTowerStorage::new(ledger_path.to_path_buf())); + if let Some(snapshot_config) = &mut config.snapshot_config { + let dummy: PathBuf = DUMMY_SNAPSHOT_CONFIG_PATH_MARKER.into(); + if snapshot_config.full_snapshot_archives_dir == dummy { + snapshot_config.full_snapshot_archives_dir = ledger_path.to_path_buf(); + } + if snapshot_config.bank_snapshots_dir == dummy { + snapshot_config.bank_snapshots_dir = ledger_path.join("snapshot"); + } + } + } + pub fn new(config: &mut ClusterConfig, socket_addr_space: SocketAddrSpace) -> Self { assert_eq!(config.validator_configs.len(), config.node_stakes.len()); @@ -241,8 +264,7 @@ impl LocalCluster { let leader_contact_info = leader_node.info.clone(); let mut leader_config = safe_clone_config(&config.validator_configs[0]); leader_config.rpc_addrs = Some((leader_node.info.rpc, leader_node.info.rpc_pubsub)); - leader_config.account_paths = vec![leader_ledger_path.join("accounts")]; - leader_config.tower_storage = Arc::new(FileTowerStorage::new(leader_ledger_path.clone())); + Self::sync_ledger_path_across_nested_config_fields(&mut leader_config, &leader_ledger_path); let leader_keypair = Arc::new(Keypair::from_bytes(&leader_keypair.to_bytes()).unwrap()); let leader_vote_keypair = Arc::new(Keypair::from_bytes(&leader_vote_keypair.to_bytes()).unwrap()); @@ -445,8 +467,7 @@ impl LocalCluster { let mut config = safe_clone_config(validator_config); config.rpc_addrs = Some((validator_node.info.rpc, validator_node.info.rpc_pubsub)); - config.account_paths = vec![ledger_path.join("accounts")]; - config.tower_storage = Arc::new(FileTowerStorage::new(ledger_path.clone())); + Self::sync_ledger_path_across_nested_config_fields(&mut config, &ledger_path); let voting_keypair = voting_keypair.unwrap(); let validator_server = Validator::new( validator_node, @@ -479,7 +500,7 @@ impl LocalCluster { validator_pubkey } - pub fn ledger_path(&self, validator_pubkey: &Pubkey) -> std::path::PathBuf { + pub fn ledger_path(&self, validator_pubkey: &Pubkey) -> PathBuf { self.validators .get(validator_pubkey) .unwrap() @@ -718,6 +739,19 @@ impl LocalCluster { )), } } + + pub fn create_dummy_load_only_snapshot_config() -> SnapshotConfig { + // DUMMY_SNAPSHOT_CONFIG_PATH_MARKER will be replaced with real value as part of cluster + // node lifecycle. + // There must be some place holder for now... + SnapshotConfig { + full_snapshot_archive_interval_slots: Slot::MAX, + incremental_snapshot_archive_interval_slots: Slot::MAX, + full_snapshot_archives_dir: DUMMY_SNAPSHOT_CONFIG_PATH_MARKER.into(), + bank_snapshots_dir: DUMMY_SNAPSHOT_CONFIG_PATH_MARKER.into(), + ..SnapshotConfig::default() + } + } } impl Cluster for LocalCluster { @@ -790,10 +824,10 @@ impl Cluster for LocalCluster { ) -> ClusterValidatorInfo { // Restart the node let validator_info = &cluster_validator_info.info; - cluster_validator_info.config.account_paths = - vec![validator_info.ledger_path.join("accounts")]; - cluster_validator_info.config.tower_storage = - Arc::new(FileTowerStorage::new(validator_info.ledger_path.clone())); + LocalCluster::sync_ledger_path_across_nested_config_fields( + &mut cluster_validator_info.config, + &validator_info.ledger_path, + ); let restarted_node = Validator::new( node, validator_info.keypair.clone(), diff --git a/local-cluster/tests/common.rs b/local-cluster/tests/common.rs index e4150f027..cf65067b1 100644 --- a/local-cluster/tests/common.rs +++ b/local-cluster/tests/common.rs @@ -103,9 +103,9 @@ pub fn open_blockstore(ledger_path: &Path) -> Blockstore { }) } -pub fn purge_slots(blockstore: &Blockstore, start_slot: Slot, slot_count: Slot) { - blockstore.purge_from_next_slots(start_slot, start_slot + slot_count); - blockstore.purge_slots(start_slot, start_slot + slot_count, PurgeType::Exact); +pub fn purge_slots_with_count(blockstore: &Blockstore, start_slot: Slot, slot_count: Slot) { + blockstore.purge_from_next_slots(start_slot, start_slot + slot_count - 1); + blockstore.purge_slots(start_slot, start_slot + slot_count - 1, PurgeType::Exact); } // Fetches the last vote in the tower, blocking until it has also appeared in blockstore. diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index 92d4f3ed2..130f6dec9 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -23,7 +23,10 @@ use { }, solana_download_utils::download_snapshot_archive, solana_gossip::gossip_service::discover_cluster, - solana_ledger::{ancestor_iterator::AncestorIterator, blockstore::Blockstore}, + solana_ledger::{ + ancestor_iterator::AncestorIterator, bank_forks_utils, blockstore::Blockstore, + blockstore_processor::ProcessOptions, + }, solana_local_cluster::{ cluster::{Cluster, ClusterValidatorInfo}, cluster_tests, @@ -31,9 +34,11 @@ use { validator_configs::*, }, solana_runtime::{ + hardened_unpack::open_genesis_config, snapshot_archive_info::SnapshotArchiveInfoGetter, + snapshot_config::SnapshotConfig, snapshot_package::SnapshotType, - snapshot_utils::{self, ArchiveFormat}, + snapshot_utils::{self, ArchiveFormat, SnapshotVersion}, }, solana_sdk::{ account::AccountSharedData, @@ -42,6 +47,7 @@ use { commitment_config::CommitmentConfig, epoch_schedule::MINIMUM_SLOTS_PER_EPOCH, genesis_config::ClusterType, + hard_forks::HardForks, poh_config::PohConfig, pubkey::Pubkey, signature::{Keypair, Signer}, @@ -57,7 +63,7 @@ use { path::Path, sync::{ atomic::{AtomicBool, Ordering}, - Arc, + Arc, Mutex, }, thread::{sleep, Builder, JoinHandle}, time::{Duration, Instant}, @@ -1927,7 +1933,7 @@ fn do_test_future_tower(cluster_mode: ClusterMode) { purged_slot_before_restart, ); let blockstore = open_blockstore(&val_a_ledger_path); - purge_slots(&blockstore, purged_slot_before_restart, 100); + purge_slots_with_count(&blockstore, purged_slot_before_restart, 100); } cluster.restart_node( @@ -1979,6 +1985,59 @@ fn test_future_tower_master_slave() { do_test_future_tower(ClusterMode::MasterSlave); } +fn restart_whole_cluster_after_hard_fork( + cluster: &Arc>, + validator_a_pubkey: Pubkey, + validator_b_pubkey: Pubkey, + mut validator_a_info: ClusterValidatorInfo, + validator_b_info: ClusterValidatorInfo, +) { + // restart validator A first + let cluster_for_a = cluster.clone(); + let val_a_ledger_path = validator_a_info.info.ledger_path.clone(); + + // Spawn a thread because wait_for_supermajority blocks in Validator::new()! + let thread = std::thread::spawn(move || { + let restart_context = cluster_for_a + .lock() + .unwrap() + .create_restart_context(&validator_a_pubkey, &mut validator_a_info); + let restarted_validator_info = LocalCluster::restart_node_with_context( + validator_a_info, + restart_context, + SocketAddrSpace::Unspecified, + ); + cluster_for_a + .lock() + .unwrap() + .add_node(&validator_a_pubkey, restarted_validator_info); + }); + + // test validator A actually to wait for supermajority + let mut last_vote = None; + for _ in 0..10 { + sleep(Duration::from_millis(1000)); + + let (new_last_vote, _) = + last_vote_in_tower(&val_a_ledger_path, &validator_a_pubkey).unwrap(); + if let Some(last_vote) = last_vote { + assert_eq!(last_vote, new_last_vote); + } else { + last_vote = Some(new_last_vote); + } + } + + // restart validator B normally + cluster.lock().unwrap().restart_node( + &validator_b_pubkey, + validator_b_info, + SocketAddrSpace::Unspecified, + ); + + // validator A should now start so join its thread here + thread.join().unwrap(); +} + #[test] fn test_hard_fork_invalidates_tower() { solana_logger::setup_with_default(RUST_LOG_FILTER); @@ -2038,6 +2097,8 @@ fn test_hard_fork_invalidates_tower() { let mut validator_b_info = cluster.lock().unwrap().exit_node(&validator_b_pubkey); // setup hard fork at slot < a previously rooted slot! + // hard fork earlier than root is very unrealistic in the wild, but it's handy for + // persistent tower's lockout behavior... let hard_fork_slot = min_root - 5; let hard_fork_slots = Some(vec![hard_fork_slot]); let mut hard_forks = solana_sdk::hard_forks::HardForks::default(); @@ -2060,53 +2121,18 @@ fn test_hard_fork_invalidates_tower() { { let blockstore_a = open_blockstore(&validator_a_info.info.ledger_path); let blockstore_b = open_blockstore(&validator_b_info.info.ledger_path); - purge_slots(&blockstore_a, hard_fork_slot + 1, 100); - purge_slots(&blockstore_b, hard_fork_slot + 1, 100); + purge_slots_with_count(&blockstore_a, hard_fork_slot + 1, 100); + purge_slots_with_count(&blockstore_b, hard_fork_slot + 1, 100); } - // restart validator A first - let cluster_for_a = cluster.clone(); - // Spawn a thread because wait_for_supermajority blocks in Validator::new()! - let thread = std::thread::spawn(move || { - let restart_context = cluster_for_a - .lock() - .unwrap() - .create_restart_context(&validator_a_pubkey, &mut validator_a_info); - let restarted_validator_info = LocalCluster::restart_node_with_context( - validator_a_info, - restart_context, - SocketAddrSpace::Unspecified, - ); - cluster_for_a - .lock() - .unwrap() - .add_node(&validator_a_pubkey, restarted_validator_info); - }); - - // test validator A actually to wait for supermajority - let mut last_vote = None; - for _ in 0..10 { - sleep(Duration::from_millis(1000)); - - let (new_last_vote, _) = - last_vote_in_tower(&val_a_ledger_path, &validator_a_pubkey).unwrap(); - if let Some(last_vote) = last_vote { - assert_eq!(last_vote, new_last_vote); - } else { - last_vote = Some(new_last_vote); - } - } - - // restart validator B normally - cluster.lock().unwrap().restart_node( - &validator_b_pubkey, + restart_whole_cluster_after_hard_fork( + &cluster, + validator_a_pubkey, + validator_b_pubkey, + validator_a_info, validator_b_info, - SocketAddrSpace::Unspecified, ); - // validator A should now start so join its thread here - thread.join().unwrap(); - // new slots should be rooted after hard-fork cluster relaunch cluster .lock() @@ -2120,6 +2146,224 @@ fn test_run_test_load_program_accounts_root() { run_test_load_program_accounts(CommitmentConfig::finalized()); } +fn create_simple_snapshot_config(ledger_path: &Path) -> SnapshotConfig { + SnapshotConfig { + full_snapshot_archives_dir: ledger_path.to_path_buf(), + bank_snapshots_dir: ledger_path.join("snapshot"), + ..SnapshotConfig::default() + } +} + +fn create_snapshot_to_hard_fork( + blockstore: &Blockstore, + snapshot_slot: Slot, + hard_forks: Vec, +) { + let process_options = ProcessOptions { + halt_at_slot: Some(snapshot_slot), + new_hard_forks: Some(hard_forks), + poh_verify: false, + ..ProcessOptions::default() + }; + let ledger_path = blockstore.ledger_path(); + let genesis_config = open_genesis_config(ledger_path, u64::max_value()); + let snapshot_config = Some(create_simple_snapshot_config(ledger_path)); + let (bank_forks, ..) = bank_forks_utils::load( + &genesis_config, + blockstore, + vec![ledger_path.join("accounts")], + None, + snapshot_config.as_ref(), + process_options, + None, + None, + None, + ) + .unwrap(); + let bank = bank_forks.read().unwrap().get(snapshot_slot).unwrap(); + let full_snapshot_archive_info = snapshot_utils::bank_to_full_snapshot_archive( + ledger_path, + &bank, + Some(SnapshotVersion::default()), + ledger_path, + ledger_path, + ArchiveFormat::TarZstd, + 1, + 1, + ) + .unwrap(); + info!( + "Successfully created snapshot for slot {}, hash {}: {}", + bank.slot(), + bank.hash(), + full_snapshot_archive_info.path().display(), + ); +} + +#[test] +#[serial] +fn test_hard_fork_with_gap_in_roots() { + solana_logger::setup_with_default(RUST_LOG_FILTER); + + // First set up the cluster with 2 nodes + let slots_per_epoch = 2048; + let node_stakes = vec![60, 40]; + + let validator_keys = vec![ + "28bN3xyvrP4E8LwEgtLjhnkb7cY4amQb6DrYAbAYjgRV4GAGgkVM2K7wnxnAS7WDneuavza7x21MiafLu1HkwQt4", + "2saHBBoTkLMmttmPQP8KfBkcCw45S5cwtV3wTdGCscRC8uxdgvHxpHiWXKx4LvJjNJtnNcbSv5NdheokFFqnNDt8", + ] + .iter() + .map(|s| (Arc::new(Keypair::from_base58_string(s)), true)) + .take(node_stakes.len()) + .collect::>(); + let validators = validator_keys + .iter() + .map(|(kp, _)| kp.pubkey()) + .collect::>(); + + let validator_a_pubkey = validators[0]; + let validator_b_pubkey = validators[1]; + + let validator_config = ValidatorConfig { + snapshot_config: Some(LocalCluster::create_dummy_load_only_snapshot_config()), + ..ValidatorConfig::default() + }; + let mut config = ClusterConfig { + cluster_lamports: 100_000, + node_stakes: node_stakes.clone(), + validator_configs: make_identical_validator_configs(&validator_config, node_stakes.len()), + validator_keys: Some(validator_keys), + slots_per_epoch, + stakers_slot_offset: slots_per_epoch, + skip_warmup_slots: true, + ..ClusterConfig::default() + }; + let cluster = std::sync::Arc::new(std::sync::Mutex::new(LocalCluster::new( + &mut config, + SocketAddrSpace::Unspecified, + ))); + + let val_a_ledger_path = cluster.lock().unwrap().ledger_path(&validator_a_pubkey); + let val_b_ledger_path = cluster.lock().unwrap().ledger_path(&validator_b_pubkey); + + let min_last_vote = 45; + let min_root = 10; + loop { + sleep(Duration::from_millis(100)); + + if let Some((last_vote, _)) = last_vote_in_tower(&val_a_ledger_path, &validator_a_pubkey) { + if last_vote >= min_last_vote + && root_in_tower(&val_a_ledger_path, &validator_a_pubkey) > Some(min_root) + { + break; + } + } + } + + // stop all nodes of the cluster + let mut validator_a_info = cluster.lock().unwrap().exit_node(&validator_a_pubkey); + let mut validator_b_info = cluster.lock().unwrap().exit_node(&validator_b_pubkey); + + // hard fork slot is effectively a (possibly skipping) new root. + // assert that the precondition of validator a to test gap between + // blockstore and hard fork... + let hard_fork_slot = min_last_vote - 5; + assert!(hard_fork_slot > root_in_tower(&val_a_ledger_path, &validator_a_pubkey).unwrap()); + + let hard_fork_slots = Some(vec![hard_fork_slot]); + let mut hard_forks = HardForks::default(); + hard_forks.register(hard_fork_slot); + + let expected_shred_version = solana_sdk::shred_version::compute_shred_version( + &cluster.lock().unwrap().genesis_config.hash(), + Some(&hard_forks), + ); + + // create hard-forked snapshot only for validator a, emulating the manual cluster restart + // procedure with `solana-ledger-tool create-snapshot` + let genesis_slot = 0; + { + let blockstore_a = Blockstore::open(&val_a_ledger_path).unwrap(); + create_snapshot_to_hard_fork(&blockstore_a, hard_fork_slot, vec![hard_fork_slot]); + + // Intentionally make solana-validator unbootable by replaying blocks from the genesis to + // ensure the hard-forked snapshot is used always. Otherwise, we couldn't create a gap + // in the ledger roots column family reliably. + // There was a bug which caused the hard-forked snapshot at an unrooted slot to forget + // to root some slots (thus, creating a gap in roots, which shouldn't happen). + purge_slots_with_count(&blockstore_a, genesis_slot, 1); + + let next_slot = genesis_slot + 1; + let mut meta = blockstore_a.meta(next_slot).unwrap().unwrap(); + meta.unset_parent(); + blockstore_a.put_meta(next_slot, &meta).unwrap(); + } + + // strictly speaking, new_hard_forks isn't needed for validator a. + // but when snapshot loading isn't working, you might see: + // shred version mismatch: expected NNNN found: MMMM + //validator_a_info.config.new_hard_forks = hard_fork_slots.clone(); + + // effectively pass the --hard-fork parameter to validator b + validator_b_info.config.new_hard_forks = hard_fork_slots; + + validator_a_info.config.wait_for_supermajority = Some(hard_fork_slot); + validator_a_info.config.expected_shred_version = Some(expected_shred_version); + + validator_b_info.config.wait_for_supermajority = Some(hard_fork_slot); + validator_b_info.config.expected_shred_version = Some(expected_shred_version); + + restart_whole_cluster_after_hard_fork( + &cluster, + validator_a_pubkey, + validator_b_pubkey, + validator_a_info, + validator_b_info, + ); + // new slots should be rooted after hard-fork cluster relaunch + cluster + .lock() + .unwrap() + .check_for_new_roots(16, "hard fork", SocketAddrSpace::Unspecified); + + // drop everything to open blockstores below + drop(cluster); + + let (common_last_vote, common_root) = { + let (last_vote_a, _) = last_vote_in_tower(&val_a_ledger_path, &validator_a_pubkey).unwrap(); + let (last_vote_b, _) = last_vote_in_tower(&val_b_ledger_path, &validator_b_pubkey).unwrap(); + let root_a = root_in_tower(&val_a_ledger_path, &validator_a_pubkey).unwrap(); + let root_b = root_in_tower(&val_b_ledger_path, &validator_b_pubkey).unwrap(); + (last_vote_a.min(last_vote_b), root_a.min(root_b)) + }; + + let blockstore_a = Blockstore::open(&val_a_ledger_path).unwrap(); + let blockstore_b = Blockstore::open(&val_b_ledger_path).unwrap(); + + // collect all slot/root parents + let mut slots_a = AncestorIterator::new(common_last_vote, &blockstore_a).collect::>(); + let mut roots_a = blockstore_a + .reversed_rooted_slot_iterator(common_root) + .unwrap() + .collect::>(); + // artifically restore the forcibly purged genesis only for the validator A just for the sake of + // the final assertions. + slots_a.push(genesis_slot); + roots_a.push(genesis_slot); + + let slots_b = AncestorIterator::new(common_last_vote, &blockstore_b).collect::>(); + let roots_b = blockstore_b + .reversed_rooted_slot_iterator(common_root) + .unwrap() + .collect::>(); + + // compare them all! + assert_eq!((&slots_a, &roots_a), (&slots_b, &roots_b)); + assert_eq!(&slots_a[slots_a.len() - roots_a.len()..].to_vec(), &roots_a); + assert_eq!(&slots_b[slots_b.len() - roots_b.len()..].to_vec(), &roots_b); +} + #[test] #[serial] fn test_restart_tower_rollback() { diff --git a/local-cluster/tests/local_cluster_flakey.rs b/local-cluster/tests/local_cluster_flakey.rs index 6dc530dcb..88a87b27a 100644 --- a/local-cluster/tests/local_cluster_flakey.rs +++ b/local-cluster/tests/local_cluster_flakey.rs @@ -243,7 +243,7 @@ fn do_test_optimistic_confirmation_violation_with_or_without_tower(with_tower: b remove_tower(&val_c_ledger_path, &validator_b_pubkey); let blockstore = open_blockstore(&val_c_ledger_path); - purge_slots(&blockstore, base_slot + 1, truncated_slots); + purge_slots_with_count(&blockstore, base_slot + 1, truncated_slots); } info!("Create validator A's ledger"); { @@ -257,7 +257,7 @@ fn do_test_optimistic_confirmation_violation_with_or_without_tower(with_tower: b copy_blocks(b_last_vote, &b_blockstore, &a_blockstore); // Purge uneccessary slots - purge_slots(&a_blockstore, next_slot_on_a + 1, truncated_slots); + purge_slots_with_count(&a_blockstore, next_slot_on_a + 1, truncated_slots); } // This should be guaranteed because we waited for validator `A` to vote on a slot > `next_slot_on_a` @@ -270,7 +270,7 @@ fn do_test_optimistic_confirmation_violation_with_or_without_tower(with_tower: b { let blockstore = open_blockstore(&val_a_ledger_path); - purge_slots(&blockstore, next_slot_on_a + 1, truncated_slots); + purge_slots_with_count(&blockstore, next_slot_on_a + 1, truncated_slots); if !with_tower { info!("Removing tower!"); remove_tower(&val_a_ledger_path, &validator_a_pubkey); @@ -281,7 +281,7 @@ fn do_test_optimistic_confirmation_violation_with_or_without_tower(with_tower: b // hasn't gotten the heavier fork from validator C yet. // Then it will be stuck on 27 unable to switch because C doesn't // have enough stake to generate a switching proof - purge_slots(&blockstore, next_slot_on_a, truncated_slots); + purge_slots_with_count(&blockstore, next_slot_on_a, truncated_slots); } else { info!("Not removing tower!"); } diff --git a/local-cluster/tests/local_cluster_slow_2.rs b/local-cluster/tests/local_cluster_slow_2.rs index c2f10cade..6488ddea1 100644 --- a/local-cluster/tests/local_cluster_slow_2.rs +++ b/local-cluster/tests/local_cluster_slow_2.rs @@ -367,7 +367,7 @@ fn test_slot_hash_expiry() { // Get rid of any slots past common_ancestor_slot info!("Removing extra slots from B's blockstore"); let blockstore = open_blockstore(&b_ledger_path); - purge_slots(&blockstore, common_ancestor_slot + 1, 100); + purge_slots_with_count(&blockstore, common_ancestor_slot + 1, 100); } info!(