diff --git a/core/src/snapshot_packager_service.rs b/core/src/snapshot_packager_service.rs index 003cc100e..09f313969 100644 --- a/core/src/snapshot_packager_service.rs +++ b/core/src/snapshot_packager_service.rs @@ -1,7 +1,13 @@ -use solana_gossip::cluster_info::{ClusterInfo, MAX_SNAPSHOT_HASHES}; +use solana_gossip::cluster_info::{ + ClusterInfo, MAX_INCREMENTAL_SNAPSHOT_HASHES, MAX_SNAPSHOT_HASHES, +}; use solana_runtime::{ snapshot_archive_info::SnapshotArchiveInfoGetter, snapshot_config::SnapshotConfig, + snapshot_hash::{ + FullSnapshotHash, FullSnapshotHashes, IncrementalSnapshotHash, IncrementalSnapshotHashes, + StartingSnapshotHashes, + }, snapshot_package::{PendingSnapshotPackage, SnapshotType}, snapshot_utils, }; @@ -22,26 +28,34 @@ pub struct SnapshotPackagerService { impl SnapshotPackagerService { pub fn new( pending_snapshot_package: PendingSnapshotPackage, - starting_snapshot_hash: Option<(Slot, Hash)>, + starting_snapshot_hashes: Option, exit: &Arc, cluster_info: &Arc, snapshot_config: SnapshotConfig, ) -> Self { let exit = exit.clone(); let cluster_info = cluster_info.clone(); - let max_snapshot_hashes = std::cmp::min( + let max_full_snapshot_hashes = std::cmp::min( MAX_SNAPSHOT_HASHES, snapshot_config.maximum_full_snapshot_archives_to_retain, ); + let max_incremental_snapshot_hashes = std::cmp::min( + MAX_INCREMENTAL_SNAPSHOT_HASHES, + snapshot_config.maximum_incremental_snapshot_archives_to_retain, + ); let t_snapshot_packager = Builder::new() .name("snapshot-packager".to_string()) .spawn(move || { - let mut hashes = vec![]; - if let Some(starting_snapshot_hash) = starting_snapshot_hash { - hashes.push(starting_snapshot_hash); - } - cluster_info.push_snapshot_hashes(hashes.clone()); + let mut snapshot_gossip_manager = SnapshotGossipManager { + cluster_info, + max_full_snapshot_hashes, + max_incremental_snapshot_hashes, + full_snapshot_hashes: FullSnapshotHashes::default(), + incremental_snapshot_hashes: IncrementalSnapshotHashes::default(), + }; + snapshot_gossip_manager.push_starting_snapshot_hashes(starting_snapshot_hashes); + loop { if exit.load(Ordering::Relaxed) { break; @@ -64,15 +78,10 @@ impl SnapshotPackagerService { ) .expect("failed to archive snapshot package"); - // NOTE: For backwards compatibility with version <=1.7, only _full_ snapshots - // can have their hashes pushed out to the cluster. - if snapshot_package.snapshot_type == SnapshotType::FullSnapshot { - hashes.push((snapshot_package.slot(), *snapshot_package.hash())); - while hashes.len() > max_snapshot_hashes { - hashes.remove(0); - } - cluster_info.push_snapshot_hashes(hashes.clone()); - } + snapshot_gossip_manager.push_snapshot_hash( + snapshot_package.snapshot_type, + (snapshot_package.slot(), *snapshot_package.hash()), + ); } }) .unwrap(); @@ -87,6 +96,105 @@ impl SnapshotPackagerService { } } +struct SnapshotGossipManager { + cluster_info: Arc, + max_full_snapshot_hashes: usize, + max_incremental_snapshot_hashes: usize, + full_snapshot_hashes: FullSnapshotHashes, + incremental_snapshot_hashes: IncrementalSnapshotHashes, +} + +impl SnapshotGossipManager { + /// If there were starting snapshot hashes, add those to their respective vectors, then push + /// those vectors to the cluster via CRDS. + fn push_starting_snapshot_hashes( + &mut self, + starting_snapshot_hashes: Option, + ) { + if let Some(starting_snapshot_hashes) = starting_snapshot_hashes { + let starting_full_snapshot_hash = starting_snapshot_hashes.full; + self.push_full_snapshot_hash(starting_full_snapshot_hash); + + if let Some(starting_incremental_snapshot_hash) = starting_snapshot_hashes.incremental { + self.push_incremental_snapshot_hash(starting_incremental_snapshot_hash); + }; + } + } + + /// Add `snapshot_hash` to its respective vector of hashes, then push that vector to the + /// cluster via CRDS. + fn push_snapshot_hash(&mut self, snapshot_type: SnapshotType, snapshot_hash: (Slot, Hash)) { + match snapshot_type { + SnapshotType::FullSnapshot => { + self.push_full_snapshot_hash(FullSnapshotHash { + hash: snapshot_hash, + }); + } + SnapshotType::IncrementalSnapshot(base_slot) => { + let latest_full_snapshot_hash = *self.full_snapshot_hashes.hashes.last().unwrap(); + assert_eq!( + base_slot, latest_full_snapshot_hash.0, + "the incremental snapshot's base slot ({}) must match the latest full snapshot hash's slot ({})", + base_slot, latest_full_snapshot_hash.0, + ); + self.push_incremental_snapshot_hash(IncrementalSnapshotHash { + base: latest_full_snapshot_hash, + hash: snapshot_hash, + }); + } + } + } + + /// Add `full_snapshot_hash` to the vector of full snapshot hashes, then push that vector to + /// the cluster via CRDS. + fn push_full_snapshot_hash(&mut self, full_snapshot_hash: FullSnapshotHash) { + self.full_snapshot_hashes + .hashes + .push(full_snapshot_hash.hash); + while self.full_snapshot_hashes.hashes.len() > self.max_full_snapshot_hashes { + self.full_snapshot_hashes.hashes.remove(0); + } + self.cluster_info + .push_snapshot_hashes(self.full_snapshot_hashes.hashes.clone()); + } + + /// Add `incremental_snapshot_hash` to the vector of incremental snapshot hashes, then push + /// that vector to the cluster via CRDS. + fn push_incremental_snapshot_hash( + &mut self, + incremental_snapshot_hash: IncrementalSnapshotHash, + ) { + // If the base snapshot hash is different from the one in IncrementalSnapshotHashes, then + // that means the old incremental snapshot hashes are no longer valid, so clear them all + // out. + if incremental_snapshot_hash.base != self.incremental_snapshot_hashes.base { + self.incremental_snapshot_hashes.hashes.clear(); + self.incremental_snapshot_hashes.base = incremental_snapshot_hash.base; + } + + self.incremental_snapshot_hashes + .hashes + .push(incremental_snapshot_hash.hash); + while self.incremental_snapshot_hashes.hashes.len() > self.max_incremental_snapshot_hashes { + self.incremental_snapshot_hashes.hashes.remove(0); + } + // Pushing incremental snapshot hashes to the cluster should never fail. The only error + // case is when the length of the hashes is too big, but we account for that with + // `max_incremental_snapshot_hashes`. If this call ever does error, it's a programmer bug! + // Check to see what changed in `push_incremental_snapshot_hashes()` and handle the new + // error condition here. + self.cluster_info + .push_incremental_snapshot_hashes( + self.incremental_snapshot_hashes.base, + self.incremental_snapshot_hashes.hashes.clone(), + ) + .expect( + "Bug! The programmer contract has changed for push_incremental_snapshot_hashes() \ + and a new error case has been added, which has not been handled here.", + ); + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/core/src/validator.rs b/core/src/validator.rs index 8936063a3..633491db3 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -71,6 +71,7 @@ use { hardened_unpack::{open_genesis_config, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE}, snapshot_archive_info::SnapshotArchiveInfoGetter, snapshot_config::SnapshotConfig, + snapshot_hash::StartingSnapshotHashes, snapshot_package::{AccountsPackageSender, PendingSnapshotPackage}, snapshot_utils, }, @@ -417,7 +418,7 @@ impl Validator { completed_slots_receiver, leader_schedule_cache, last_full_snapshot_slot, - snapshot_hash, + starting_snapshot_hashes, TransactionHistoryServices { transaction_status_sender, transaction_status_service, @@ -695,7 +696,7 @@ impl Validator { let snapshot_packager_service = SnapshotPackagerService::new( pending_snapshot_package.clone(), - snapshot_hash, + starting_snapshot_hashes, &exit, &cluster_info, snapshot_config.clone(), @@ -1149,7 +1150,7 @@ fn new_banks_from_ledger( CompletedSlotsReceiver, LeaderScheduleCache, Option, - Option<(Slot, Hash)>, + Option, TransactionHistoryServices, Tower, ) { @@ -1244,27 +1245,31 @@ fn new_banks_from_ledger( TransactionHistoryServices::default() }; - let (mut bank_forks, mut leader_schedule_cache, last_full_snapshot_slot, snapshot_hash) = - bank_forks_utils::load( - &genesis_config, - &blockstore, - config.account_paths.clone(), - config.account_shrink_paths.clone(), - config.snapshot_config.as_ref(), - process_options, - transaction_history_services - .transaction_status_sender - .as_ref(), - transaction_history_services - .cache_block_meta_sender - .as_ref(), - accounts_package_sender, - accounts_update_notifier, - ) - .unwrap_or_else(|err| { - error!("Failed to load ledger: {:?}", err); - abort() - }); + let ( + mut bank_forks, + mut leader_schedule_cache, + last_full_snapshot_slot, + starting_snapshot_hashes, + ) = bank_forks_utils::load( + &genesis_config, + &blockstore, + config.account_paths.clone(), + config.account_shrink_paths.clone(), + config.snapshot_config.as_ref(), + process_options, + transaction_history_services + .transaction_status_sender + .as_ref(), + transaction_history_services + .cache_block_meta_sender + .as_ref(), + accounts_package_sender, + accounts_update_notifier, + ) + .unwrap_or_else(|err| { + error!("Failed to load ledger: {:?}", err); + abort() + }); if let Some(warp_slot) = config.warp_slot { let snapshot_config = config.snapshot_config.as_ref().unwrap_or_else(|| { @@ -1344,7 +1349,7 @@ fn new_banks_from_ledger( completed_slots_receiver, leader_schedule_cache, last_full_snapshot_slot, - snapshot_hash, + starting_snapshot_hashes, transaction_history_services, tower, ) diff --git a/ledger/src/bank_forks_utils.rs b/ledger/src/bank_forks_utils.rs index 0f2cdf5ef..1b5258475 100644 --- a/ledger/src/bank_forks_utils.rs +++ b/ledger/src/bank_forks_utils.rs @@ -10,11 +10,15 @@ use crate::{ use log::*; use solana_entry::entry::VerifyRecyclers; use solana_runtime::{ - accounts_update_notifier_interface::AccountsUpdateNotifier, bank_forks::BankForks, - snapshot_archive_info::SnapshotArchiveInfoGetter, snapshot_config::SnapshotConfig, - snapshot_package::AccountsPackageSender, snapshot_utils, + accounts_update_notifier_interface::AccountsUpdateNotifier, + bank_forks::BankForks, + snapshot_archive_info::SnapshotArchiveInfoGetter, + snapshot_config::SnapshotConfig, + snapshot_hash::{FullSnapshotHash, IncrementalSnapshotHash, StartingSnapshotHashes}, + snapshot_package::AccountsPackageSender, + snapshot_utils, }; -use solana_sdk::{clock::Slot, genesis_config::GenesisConfig, hash::Hash}; +use solana_sdk::{clock::Slot, genesis_config::GenesisConfig}; use std::{fs, path::PathBuf, process, result}; pub type LoadResult = result::Result< @@ -22,14 +26,14 @@ pub type LoadResult = result::Result< BankForks, LeaderScheduleCache, Option, - Option<(Slot, Hash)>, + Option, ), BlockstoreProcessorError, >; fn to_loadresult( bpr: BlockstoreProcessorResult, - snapshot_slot_and_hash: Option<(Slot, Hash)>, + starting_snapshot_hashes: Option, ) -> LoadResult { bpr.map( |(bank_forks, leader_schedule_cache, last_full_snapshot_slot)| { @@ -37,7 +41,7 @@ fn to_loadresult( bank_forks, leader_schedule_cache, last_full_snapshot_slot, - snapshot_slot_and_hash, + starting_snapshot_hashes, ) }, ) @@ -150,7 +154,7 @@ fn load_from_snapshot( process::exit(1); } - let (deserialized_bank, timings, full_snapshot_archive_info, _) = + let (deserialized_bank, timings, full_snapshot_archive_info, incremental_snapshot_archive_info) = snapshot_utils::bank_from_latest_snapshot_archives( &snapshot_config.bank_snapshots_dir, &snapshot_config.snapshot_archives_dir, @@ -171,15 +175,31 @@ fn load_from_snapshot( ) .expect("Load from snapshot failed"); - let deserialized_bank_slot_and_hash = ( - deserialized_bank.slot(), - deserialized_bank.get_accounts_hash(), - ); - if let Some(shrink_paths) = shrink_paths { deserialized_bank.set_shrink_paths(shrink_paths); } + let starting_full_snapshot_hash = FullSnapshotHash { + hash: ( + full_snapshot_archive_info.slot(), + *full_snapshot_archive_info.hash(), + ), + }; + let starting_incremental_snapshot_hash = + incremental_snapshot_archive_info.map(|incremental_snapshot_archive_info| { + IncrementalSnapshotHash { + base: starting_full_snapshot_hash.hash, + hash: ( + incremental_snapshot_archive_info.slot(), + *incremental_snapshot_archive_info.hash(), + ), + } + }); + let starting_snapshot_hashes = StartingSnapshotHashes { + full: starting_full_snapshot_hash, + incremental: starting_incremental_snapshot_hash, + }; + to_loadresult( blockstore_processor::process_blockstore_from_root( blockstore, @@ -193,6 +213,6 @@ fn load_from_snapshot( timings, full_snapshot_archive_info.slot(), ), - Some(deserialized_bank_slot_and_hash), + Some(starting_snapshot_hashes), ) } diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index 6ff0f4ea4..52a6251a2 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -42,6 +42,7 @@ pub mod serde_snapshot; mod shared_buffer_reader; pub mod snapshot_archive_info; pub mod snapshot_config; +pub mod snapshot_hash; pub mod snapshot_package; pub mod snapshot_utils; pub mod sorted_storages; diff --git a/runtime/src/snapshot_hash.rs b/runtime/src/snapshot_hash.rs new file mode 100644 index 000000000..aa68bc744 --- /dev/null +++ b/runtime/src/snapshot_hash.rs @@ -0,0 +1,44 @@ +//! Helper types and functions for handling and dealing with snapshot hashes. +use solana_sdk::{clock::Slot, hash::Hash}; + +/// At startup, when loading from snapshots, the starting snapshot hashes need to be passed to +/// SnapshotPackagerService, which is in charge of pushing the hashes to CRDS. This struct wraps +/// up those values make it easier to pass from bank_forks_utils, through validator, to +/// SnapshotPackagerService. +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] +pub struct StartingSnapshotHashes { + pub full: FullSnapshotHash, + pub incremental: Option, +} + +/// Used by SnapshotPackagerService and SnapshotGossipManager, this struct adds type safety to +/// ensure a full snapshot hash is pushed to the right CRDS. +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] +pub struct FullSnapshotHash { + pub hash: (Slot, Hash), +} + +/// Used by SnapshotPackagerService and SnapshotGossipManager, this struct adds type safety to +/// ensure an incremental snapshot hash is pushed to the right CRDS. `base` is the (full) snapshot +/// this incremental snapshot (`hash`) is based on. +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] +pub struct IncrementalSnapshotHash { + pub base: (Slot, Hash), + pub hash: (Slot, Hash), +} + +/// FullSnapshotHashes is used by SnapshotPackagerService to collect the snapshot hashes from full +/// snapshots and then push those hashes to CRDS. +#[derive(Debug, Default, Clone, PartialEq, Eq)] +pub struct FullSnapshotHashes { + pub hashes: Vec<(Slot, Hash)>, +} + +/// IncrementalSnapshotHashes is used by SnapshotPackagerService to collect the snapshot hashes +/// from incremental snapshots and then push those hashes to CRDS. `base` is the (full) snapshot +/// all the incremental snapshots (`hashes`) are based on. +#[derive(Debug, Default, Clone, PartialEq, Eq)] +pub struct IncrementalSnapshotHashes { + pub base: (Slot, Hash), + pub hashes: Vec<(Slot, Hash)>, +}