Revert "Make startup aware of Incremental Snapshots (#19550)" (#19599)

This reverts commit d45ced0a5d.
This commit is contained in:
Brooks Prumo 2021-09-02 19:14:41 -05:00 committed by GitHub
parent d45ced0a5d
commit e9374d32a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 92 additions and 337 deletions

View File

@ -572,17 +572,8 @@ mod tests {
full_leader_cache: true, full_leader_cache: true,
..ProcessOptions::default() ..ProcessOptions::default()
}; };
let (accounts_package_sender, _) = channel(); let (bank_forks, cached_leader_schedule) =
let (bank_forks, cached_leader_schedule, _) = process_blockstore( process_blockstore(&genesis_config, &blockstore, Vec::new(), opts, None).unwrap();
&genesis_config,
&blockstore,
Vec::new(),
opts,
None,
None,
accounts_package_sender,
)
.unwrap();
let leader_schedule_cache = Arc::new(cached_leader_schedule); let leader_schedule_cache = Arc::new(cached_leader_schedule);
let bank_forks = Arc::new(RwLock::new(bank_forks)); let bank_forks = Arc::new(RwLock::new(bank_forks));

View File

@ -44,10 +44,10 @@ use solana_runtime::{
bank_forks::BankForks, bank_forks::BankForks,
commitment::BlockCommitmentCache, commitment::BlockCommitmentCache,
snapshot_config::SnapshotConfig, snapshot_config::SnapshotConfig,
snapshot_package::{AccountsPackageReceiver, AccountsPackageSender, PendingSnapshotPackage}, snapshot_package::PendingSnapshotPackage,
vote_sender_types::ReplayVoteSender, vote_sender_types::ReplayVoteSender,
}; };
use solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Keypair}; use solana_sdk::{pubkey::Pubkey, signature::Keypair};
use std::{ use std::{
boxed::Box, boxed::Box,
collections::HashSet, collections::HashSet,
@ -135,8 +135,6 @@ impl Tvu {
tvu_config: TvuConfig, tvu_config: TvuConfig,
max_slots: &Arc<MaxSlots>, max_slots: &Arc<MaxSlots>,
cost_model: &Arc<RwLock<CostModel>>, cost_model: &Arc<RwLock<CostModel>>,
accounts_package_channel: (AccountsPackageSender, AccountsPackageReceiver),
last_full_snapshot_slot: Option<Slot>,
) -> Self { ) -> Self {
let Sockets { let Sockets {
repair: repair_socket, repair: repair_socket,
@ -214,9 +212,9 @@ impl Tvu {
(Some(snapshot_config), Some(pending_snapshot_package)) (Some(snapshot_config), Some(pending_snapshot_package))
}) })
.unwrap_or((None, None)); .unwrap_or((None, None));
let (accounts_package_sender, accounts_package_receiver) = accounts_package_channel; let (accounts_hash_sender, accounts_hash_receiver) = channel();
let accounts_hash_verifier = AccountsHashVerifier::new( let accounts_hash_verifier = AccountsHashVerifier::new(
accounts_package_receiver, accounts_hash_receiver,
pending_snapshot_package, pending_snapshot_package,
exit, exit,
cluster_info, cluster_info,
@ -226,19 +224,20 @@ impl Tvu {
snapshot_config.clone(), snapshot_config.clone(),
); );
let (snapshot_request_sender, snapshot_request_handler) = match snapshot_config { let (snapshot_request_sender, snapshot_request_handler) = {
None => (None, None), snapshot_config
Some(snapshot_config) => { .map(|snapshot_config| {
let (snapshot_request_sender, snapshot_request_receiver) = unbounded(); let (snapshot_request_sender, snapshot_request_receiver) = unbounded();
( (
Some(snapshot_request_sender), Some(snapshot_request_sender),
Some(SnapshotRequestHandler { Some(SnapshotRequestHandler {
snapshot_config, snapshot_config,
snapshot_request_receiver, snapshot_request_receiver,
accounts_package_sender, accounts_package_sender: accounts_hash_sender,
}), }),
) )
} })
.unwrap_or((None, None))
}; };
let (pruned_banks_sender, pruned_banks_receiver) = unbounded(); let (pruned_banks_sender, pruned_banks_receiver) = unbounded();
@ -341,7 +340,7 @@ impl Tvu {
tvu_config.accounts_db_caching_enabled, tvu_config.accounts_db_caching_enabled,
tvu_config.test_hash_calculation, tvu_config.test_hash_calculation,
tvu_config.use_index_hash_calculation, tvu_config.use_index_hash_calculation,
last_full_snapshot_slot, None,
); );
Tvu { Tvu {
@ -435,7 +434,6 @@ pub mod tests {
let (_, gossip_confirmed_slots_receiver) = unbounded(); let (_, gossip_confirmed_slots_receiver) = unbounded();
let bank_forks = Arc::new(RwLock::new(bank_forks)); let bank_forks = Arc::new(RwLock::new(bank_forks));
let tower = Tower::default(); let tower = Tower::default();
let accounts_package_channel = channel();
let tvu = Tvu::new( let tvu = Tvu::new(
&vote_keypair.pubkey(), &vote_keypair.pubkey(),
Arc::new(RwLock::new(vec![Arc::new(vote_keypair)])), Arc::new(RwLock::new(vec![Arc::new(vote_keypair)])),
@ -479,8 +477,6 @@ pub mod tests {
TvuConfig::default(), TvuConfig::default(),
&Arc::new(MaxSlots::default()), &Arc::new(MaxSlots::default()),
&Arc::new(RwLock::new(CostModel::default())), &Arc::new(RwLock::new(CostModel::default())),
accounts_package_channel,
None,
); );
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);
tvu.join().unwrap(); tvu.join().unwrap();

View File

@ -69,7 +69,7 @@ use {
hardened_unpack::{open_genesis_config, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE}, hardened_unpack::{open_genesis_config, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE},
snapshot_archive_info::SnapshotArchiveInfoGetter, snapshot_archive_info::SnapshotArchiveInfoGetter,
snapshot_config::SnapshotConfig, snapshot_config::SnapshotConfig,
snapshot_package::{AccountsPackageSender, PendingSnapshotPackage}, snapshot_package::PendingSnapshotPackage,
snapshot_utils, snapshot_utils,
}, },
solana_sdk::{ solana_sdk::{
@ -92,7 +92,7 @@ use {
path::{Path, PathBuf}, path::{Path, PathBuf},
sync::{ sync::{
atomic::{AtomicBool, AtomicU64, Ordering}, atomic::{AtomicBool, AtomicU64, Ordering},
mpsc::{channel, Receiver}, mpsc::Receiver,
Arc, Mutex, RwLock, Arc, Mutex, RwLock,
}, },
thread::{sleep, Builder, JoinHandle}, thread::{sleep, Builder, JoinHandle},
@ -379,7 +379,7 @@ impl Validator {
.register_exit(Box::new(move || exit.store(true, Ordering::Relaxed))); .register_exit(Box::new(move || exit.store(true, Ordering::Relaxed)));
} }
let accounts_package_channel = channel(); let (replay_vote_sender, replay_vote_receiver) = unbounded();
let ( let (
genesis_config, genesis_config,
bank_forks, bank_forks,
@ -387,7 +387,6 @@ impl Validator {
ledger_signal_receiver, ledger_signal_receiver,
completed_slots_receiver, completed_slots_receiver,
leader_schedule_cache, leader_schedule_cache,
last_full_snapshot_slot,
snapshot_hash, snapshot_hash,
TransactionHistoryServices { TransactionHistoryServices {
transaction_status_sender, transaction_status_sender,
@ -409,7 +408,6 @@ impl Validator {
config.enforce_ulimit_nofile, config.enforce_ulimit_nofile,
&start_progress, &start_progress,
config.no_poh_speed_test, config.no_poh_speed_test,
accounts_package_channel.0.clone(),
); );
*start_progress.write().unwrap() = ValidatorStartProgress::StartingServices; *start_progress.write().unwrap() = ValidatorStartProgress::StartingServices;
@ -709,7 +707,6 @@ impl Validator {
let rpc_completed_slots_service = let rpc_completed_slots_service =
RpcCompletedSlotsService::spawn(completed_slots_receiver, rpc_subscriptions.clone()); RpcCompletedSlotsService::spawn(completed_slots_receiver, rpc_subscriptions.clone());
let (replay_vote_sender, replay_vote_receiver) = unbounded();
let tvu = Tvu::new( let tvu = Tvu::new(
vote_account, vote_account,
authorized_voter_keypairs, authorized_voter_keypairs,
@ -780,8 +777,6 @@ impl Validator {
}, },
&max_slots, &max_slots,
&cost_model, &cost_model,
accounts_package_channel,
last_full_snapshot_slot,
); );
let tpu = Tpu::new( let tpu = Tpu::new(
@ -1074,7 +1069,7 @@ fn post_process_restored_tower(
}) })
} }
#[allow(clippy::type_complexity, clippy::too_many_arguments)] #[allow(clippy::type_complexity)]
fn new_banks_from_ledger( fn new_banks_from_ledger(
validator_identity: &Pubkey, validator_identity: &Pubkey,
vote_account: &Pubkey, vote_account: &Pubkey,
@ -1085,7 +1080,6 @@ fn new_banks_from_ledger(
enforce_ulimit_nofile: bool, enforce_ulimit_nofile: bool,
start_progress: &Arc<RwLock<ValidatorStartProgress>>, start_progress: &Arc<RwLock<ValidatorStartProgress>>,
no_poh_speed_test: bool, no_poh_speed_test: bool,
accounts_package_sender: AccountsPackageSender,
) -> ( ) -> (
GenesisConfig, GenesisConfig,
BankForks, BankForks,
@ -1093,7 +1087,6 @@ fn new_banks_from_ledger(
Receiver<bool>, Receiver<bool>,
CompletedSlotsReceiver, CompletedSlotsReceiver,
LeaderScheduleCache, LeaderScheduleCache,
Option<Slot>,
Option<(Slot, Hash)>, Option<(Slot, Hash)>,
TransactionHistoryServices, TransactionHistoryServices,
Tower, Tower,
@ -1189,26 +1182,24 @@ fn new_banks_from_ledger(
TransactionHistoryServices::default() TransactionHistoryServices::default()
}; };
let (mut bank_forks, mut leader_schedule_cache, last_full_snapshot_slot, snapshot_hash) = let (mut bank_forks, mut leader_schedule_cache, snapshot_hash) = bank_forks_utils::load(
bank_forks_utils::load( &genesis_config,
&genesis_config, &blockstore,
&blockstore, config.account_paths.clone(),
config.account_paths.clone(), config.account_shrink_paths.clone(),
config.account_shrink_paths.clone(), config.snapshot_config.as_ref(),
config.snapshot_config.as_ref(), process_options,
process_options, transaction_history_services
transaction_history_services .transaction_status_sender
.transaction_status_sender .as_ref(),
.as_ref(), transaction_history_services
transaction_history_services .cache_block_meta_sender
.cache_block_meta_sender .as_ref(),
.as_ref(), )
accounts_package_sender, .unwrap_or_else(|err| {
) error!("Failed to load ledger: {:?}", err);
.unwrap_or_else(|err| { abort()
error!("Failed to load ledger: {:?}", err); });
abort()
});
if let Some(warp_slot) = config.warp_slot { if let Some(warp_slot) = config.warp_slot {
let snapshot_config = config.snapshot_config.as_ref().unwrap_or_else(|| { let snapshot_config = config.snapshot_config.as_ref().unwrap_or_else(|| {
@ -1286,7 +1277,6 @@ fn new_banks_from_ledger(
ledger_signal_receiver, ledger_signal_receiver,
completed_slots_receiver, completed_slots_receiver,
leader_schedule_cache, leader_schedule_cache,
last_full_snapshot_slot,
snapshot_hash, snapshot_hash,
transaction_history_services, transaction_history_services,
tower, tower,

View File

@ -819,7 +819,7 @@ mod tests {
accounts_dir: PathBuf, accounts_dir: PathBuf,
genesis_config: &GenesisConfig, genesis_config: &GenesisConfig,
) -> snapshot_utils::Result<()> { ) -> snapshot_utils::Result<()> {
let (deserialized_bank, ..) = snapshot_utils::bank_from_latest_snapshot_archives( let (deserialized_bank, _) = snapshot_utils::bank_from_latest_snapshot_archives(
&snapshot_config.bank_snapshots_dir, &snapshot_config.bank_snapshots_dir,
&snapshot_config.snapshot_archives_dir, &snapshot_config.snapshot_archives_dir,
&[accounts_dir], &[accounts_dir],
@ -997,7 +997,7 @@ mod tests {
std::thread::sleep(Duration::from_secs(5)); std::thread::sleep(Duration::from_secs(5));
info!("Awake! Rebuilding bank from latest snapshot archives..."); info!("Awake! Rebuilding bank from latest snapshot archives...");
let (deserialized_bank, ..) = snapshot_utils::bank_from_latest_snapshot_archives( let (deserialized_bank, _) = snapshot_utils::bank_from_latest_snapshot_archives(
&snapshot_test_config.snapshot_config.bank_snapshots_dir, &snapshot_test_config.snapshot_config.bank_snapshots_dir,
&snapshot_test_config.snapshot_config.snapshot_archives_dir, &snapshot_test_config.snapshot_config.snapshot_archives_dir,
&[snapshot_test_config.accounts_dir.as_ref().to_path_buf()], &[snapshot_test_config.accounts_dir.as_ref().to_path_buf()],

View File

@ -63,7 +63,7 @@ use std::{
path::{Path, PathBuf}, path::{Path, PathBuf},
process::{exit, Command, Stdio}, process::{exit, Command, Stdio},
str::FromStr, str::FromStr,
sync::{mpsc::channel, Arc, RwLock}, sync::{Arc, RwLock},
}; };
mod bigtable; mod bigtable;
@ -712,7 +712,7 @@ fn load_bank_forks(
let snapshot_archives_dir = let snapshot_archives_dir =
snapshot_archive_path.unwrap_or_else(|| blockstore.ledger_path().to_path_buf()); snapshot_archive_path.unwrap_or_else(|| blockstore.ledger_path().to_path_buf());
Some(SnapshotConfig { Some(SnapshotConfig {
full_snapshot_archive_interval_slots: Slot::MAX, full_snapshot_archive_interval_slots: 0, // Value doesn't matter
incremental_snapshot_archive_interval_slots: Slot::MAX, incremental_snapshot_archive_interval_slots: Slot::MAX,
snapshot_archives_dir, snapshot_archives_dir,
bank_snapshots_dir, bank_snapshots_dir,
@ -740,7 +740,6 @@ fn load_bank_forks(
vec![non_primary_accounts_path] vec![non_primary_accounts_path]
}; };
let (accounts_package_sender, _) = channel();
bank_forks_utils::load( bank_forks_utils::load(
genesis_config, genesis_config,
blockstore, blockstore,
@ -750,7 +749,6 @@ fn load_bank_forks(
process_options, process_options,
None, None,
None, None,
accounts_package_sender,
) )
} }
@ -1654,7 +1652,7 @@ fn main() {
process_options, process_options,
snapshot_archive_path, snapshot_archive_path,
) { ) {
Ok((bank_forks, ..)) => { Ok((bank_forks, _leader_schedule_cache, _snapshot_hash)) => {
println!( println!(
"{}", "{}",
compute_shred_version( compute_shred_version(
@ -1729,7 +1727,7 @@ fn main() {
process_options, process_options,
snapshot_archive_path, snapshot_archive_path,
) { ) {
Ok((bank_forks, ..)) => { Ok((bank_forks, _leader_schedule_cache, _snapshot_hash)) => {
println!("{}", &bank_forks.working_bank().hash()); println!("{}", &bank_forks.working_bank().hash());
} }
Err(err) => { Err(err) => {
@ -1910,7 +1908,7 @@ fn main() {
AccessType::TryPrimaryThenSecondary, AccessType::TryPrimaryThenSecondary,
wal_recovery_mode, wal_recovery_mode,
); );
let (bank_forks, ..) = load_bank_forks( let (bank_forks, _, _) = load_bank_forks(
arg_matches, arg_matches,
&open_genesis_config_by(&ledger_path, arg_matches), &open_genesis_config_by(&ledger_path, arg_matches),
&blockstore, &blockstore,
@ -1949,7 +1947,7 @@ fn main() {
process_options, process_options,
snapshot_archive_path, snapshot_archive_path,
) { ) {
Ok((bank_forks, ..)) => { Ok((bank_forks, _leader_schedule_cache, _snapshot_hash)) => {
let dot = graph_forks(&bank_forks, arg_matches.is_present("include_all_votes")); let dot = graph_forks(&bank_forks, arg_matches.is_present("include_all_votes"));
let extension = Path::new(&output_file).extension(); let extension = Path::new(&output_file).extension();
@ -2051,7 +2049,7 @@ fn main() {
}, },
snapshot_archive_path, snapshot_archive_path,
) { ) {
Ok((bank_forks, ..)) => { Ok((bank_forks, _leader_schedule_cache, _snapshot_hash)) => {
let mut bank = bank_forks let mut bank = bank_forks
.get(snapshot_slot) .get(snapshot_slot)
.unwrap_or_else(|| { .unwrap_or_else(|| {
@ -2281,7 +2279,7 @@ fn main() {
process_options, process_options,
snapshot_archive_path, snapshot_archive_path,
) { ) {
Ok((bank_forks, ..)) => { Ok((bank_forks, _leader_schedule_cache, _snapshot_hash)) => {
let slot = bank_forks.working_bank().slot(); let slot = bank_forks.working_bank().slot();
let bank = bank_forks.get(slot).unwrap_or_else(|| { let bank = bank_forks.get(slot).unwrap_or_else(|| {
eprintln!("Error: Slot {} is not available", slot); eprintln!("Error: Slot {} is not available", slot);
@ -2340,7 +2338,7 @@ fn main() {
process_options, process_options,
snapshot_archive_path, snapshot_archive_path,
) { ) {
Ok((bank_forks, ..)) => { Ok((bank_forks, _leader_schedule_cache, _snapshot_hash)) => {
let slot = bank_forks.working_bank().slot(); let slot = bank_forks.working_bank().slot();
let bank = bank_forks.get(slot).unwrap_or_else(|| { let bank = bank_forks.get(slot).unwrap_or_else(|| {
eprintln!("Error: Slot {} is not available", slot); eprintln!("Error: Slot {} is not available", slot);

View File

@ -8,20 +8,12 @@ use crate::{
}; };
use log::*; use log::*;
use solana_entry::entry::VerifyRecyclers; use solana_entry::entry::VerifyRecyclers;
use solana_runtime::{ use solana_runtime::{bank_forks::BankForks, snapshot_config::SnapshotConfig, snapshot_utils};
bank_forks::BankForks, snapshot_archive_info::SnapshotArchiveInfoGetter,
snapshot_config::SnapshotConfig, snapshot_package::AccountsPackageSender, snapshot_utils,
};
use solana_sdk::{clock::Slot, genesis_config::GenesisConfig, hash::Hash}; use solana_sdk::{clock::Slot, genesis_config::GenesisConfig, hash::Hash};
use std::{fs, path::PathBuf, process, result}; use std::{fs, path::PathBuf, process, result};
pub type LoadResult = result::Result< pub type LoadResult = result::Result<
( (BankForks, LeaderScheduleCache, Option<(Slot, Hash)>),
BankForks,
LeaderScheduleCache,
Option<Slot>,
Option<(Slot, Hash)>,
),
BlockstoreProcessorError, BlockstoreProcessorError,
>; >;
@ -29,16 +21,9 @@ fn to_loadresult(
bpr: BlockstoreProcessorResult, bpr: BlockstoreProcessorResult,
snapshot_slot_and_hash: Option<(Slot, Hash)>, snapshot_slot_and_hash: Option<(Slot, Hash)>,
) -> LoadResult { ) -> LoadResult {
bpr.map( bpr.map(|(bank_forks, leader_schedule_cache)| {
|(bank_forks, leader_schedule_cache, last_full_snapshot_slot)| { (bank_forks, leader_schedule_cache, snapshot_slot_and_hash)
( })
bank_forks,
leader_schedule_cache,
last_full_snapshot_slot,
snapshot_slot_and_hash,
)
},
)
} }
/// Load the banks and accounts /// Load the banks and accounts
@ -54,7 +39,6 @@ pub fn load(
process_options: ProcessOptions, process_options: ProcessOptions,
transaction_status_sender: Option<&TransactionStatusSender>, transaction_status_sender: Option<&TransactionStatusSender>,
cache_block_meta_sender: Option<&CacheBlockMetaSender>, cache_block_meta_sender: Option<&CacheBlockMetaSender>,
accounts_package_sender: AccountsPackageSender,
) -> LoadResult { ) -> LoadResult {
if let Some(snapshot_config) = snapshot_config { if let Some(snapshot_config) = snapshot_config {
info!( info!(
@ -79,7 +63,6 @@ pub fn load(
process_options, process_options,
transaction_status_sender, transaction_status_sender,
cache_block_meta_sender, cache_block_meta_sender,
accounts_package_sender,
); );
} else { } else {
info!("No snapshot package available; will load from genesis"); info!("No snapshot package available; will load from genesis");
@ -94,8 +77,6 @@ pub fn load(
account_paths, account_paths,
process_options, process_options,
cache_block_meta_sender, cache_block_meta_sender,
snapshot_config,
accounts_package_sender,
) )
} }
@ -105,8 +86,6 @@ fn load_from_genesis(
account_paths: Vec<PathBuf>, account_paths: Vec<PathBuf>,
process_options: ProcessOptions, process_options: ProcessOptions,
cache_block_meta_sender: Option<&CacheBlockMetaSender>, cache_block_meta_sender: Option<&CacheBlockMetaSender>,
snapshot_config: Option<&SnapshotConfig>,
accounts_package_sender: AccountsPackageSender,
) -> LoadResult { ) -> LoadResult {
info!("Processing ledger from genesis"); info!("Processing ledger from genesis");
to_loadresult( to_loadresult(
@ -116,8 +95,6 @@ fn load_from_genesis(
account_paths, account_paths,
process_options, process_options,
cache_block_meta_sender, cache_block_meta_sender,
snapshot_config,
accounts_package_sender,
), ),
None, None,
) )
@ -133,7 +110,6 @@ fn load_from_snapshot(
process_options: ProcessOptions, process_options: ProcessOptions,
transaction_status_sender: Option<&TransactionStatusSender>, transaction_status_sender: Option<&TransactionStatusSender>,
cache_block_meta_sender: Option<&CacheBlockMetaSender>, cache_block_meta_sender: Option<&CacheBlockMetaSender>,
accounts_package_sender: AccountsPackageSender,
) -> LoadResult { ) -> LoadResult {
// Fail hard here if snapshot fails to load, don't silently continue // Fail hard here if snapshot fails to load, don't silently continue
if account_paths.is_empty() { if account_paths.is_empty() {
@ -141,25 +117,24 @@ fn load_from_snapshot(
process::exit(1); process::exit(1);
} }
let (deserialized_bank, timings, full_snapshot_archive_info, _) = let (deserialized_bank, timings) = snapshot_utils::bank_from_latest_snapshot_archives(
snapshot_utils::bank_from_latest_snapshot_archives( &snapshot_config.bank_snapshots_dir,
&snapshot_config.bank_snapshots_dir, &snapshot_config.snapshot_archives_dir,
&snapshot_config.snapshot_archives_dir, &account_paths,
&account_paths, &process_options.frozen_accounts,
&process_options.frozen_accounts, genesis_config,
genesis_config, process_options.debug_keys.clone(),
process_options.debug_keys.clone(), Some(&crate::builtins::get(process_options.bpf_jit)),
Some(&crate::builtins::get(process_options.bpf_jit)), process_options.account_indexes.clone(),
process_options.account_indexes.clone(), process_options.accounts_db_caching_enabled,
process_options.accounts_db_caching_enabled, process_options.limit_load_slot_count_from_snapshot,
process_options.limit_load_slot_count_from_snapshot, process_options.shrink_ratio,
process_options.shrink_ratio, process_options.accounts_db_test_hash_calculation,
process_options.accounts_db_test_hash_calculation, process_options.accounts_db_skip_shrink,
process_options.accounts_db_skip_shrink, process_options.verify_index,
process_options.verify_index, process_options.accounts_index_config,
process_options.accounts_index_config, )
) .expect("Load from snapshot failed");
.expect("Load from snapshot failed");
let deserialized_bank_slot_and_hash = ( let deserialized_bank_slot_and_hash = (
deserialized_bank.slot(), deserialized_bank.slot(),
@ -178,10 +153,7 @@ fn load_from_snapshot(
&VerifyRecyclers::default(), &VerifyRecyclers::default(),
transaction_status_sender, transaction_status_sender,
cache_block_meta_sender, cache_block_meta_sender,
Some(snapshot_config),
accounts_package_sender,
timings, timings,
full_snapshot_archive_info.slot(),
), ),
Some(deserialized_bank_slot_and_hash), Some(deserialized_bank_slot_and_hash),
) )

View File

@ -25,9 +25,7 @@ use solana_runtime::{
bank_forks::BankForks, bank_forks::BankForks,
bank_utils, bank_utils,
commitment::VOTE_THRESHOLD_SIZE, commitment::VOTE_THRESHOLD_SIZE,
snapshot_config::SnapshotConfig, snapshot_utils::BankFromArchiveTimings,
snapshot_package::{AccountsPackageSender, SnapshotType},
snapshot_utils::{self, BankFromArchiveTimings},
transaction_batch::TransactionBatch, transaction_batch::TransactionBatch,
vote_account::VoteAccount, vote_account::VoteAccount,
vote_sender_types::ReplayVoteSender, vote_sender_types::ReplayVoteSender,
@ -45,6 +43,7 @@ use solana_sdk::{
use solana_transaction_status::token_balances::{ use solana_transaction_status::token_balances::{
collect_token_balances, TransactionTokenBalancesSet, collect_token_balances, TransactionTokenBalancesSet,
}; };
use std::{ use std::{
cell::RefCell, cell::RefCell,
collections::{HashMap, HashSet}, collections::{HashMap, HashSet},
@ -84,7 +83,7 @@ impl BlockCostCapacityMeter {
} }
} }
pub type BlockstoreProcessorInner = (BankForks, LeaderScheduleCache, Option<Slot>); pub type BlockstoreProcessorInner = (BankForks, LeaderScheduleCache);
pub type BlockstoreProcessorResult = pub type BlockstoreProcessorResult =
result::Result<BlockstoreProcessorInner, BlockstoreProcessorError>; result::Result<BlockstoreProcessorInner, BlockstoreProcessorError>;
@ -481,8 +480,6 @@ pub fn process_blockstore(
account_paths: Vec<PathBuf>, account_paths: Vec<PathBuf>,
opts: ProcessOptions, opts: ProcessOptions,
cache_block_meta_sender: Option<&CacheBlockMetaSender>, cache_block_meta_sender: Option<&CacheBlockMetaSender>,
snapshot_config: Option<&SnapshotConfig>,
accounts_package_sender: AccountsPackageSender,
) -> BlockstoreProcessorResult { ) -> BlockstoreProcessorResult {
if let Some(num_threads) = opts.override_num_threads { if let Some(num_threads) = opts.override_num_threads {
PAR_THREAD_POOL.with(|pool| { PAR_THREAD_POOL.with(|pool| {
@ -523,15 +520,11 @@ pub fn process_blockstore(
&recyclers, &recyclers,
None, None,
cache_block_meta_sender, cache_block_meta_sender,
snapshot_config,
accounts_package_sender,
BankFromArchiveTimings::default(), BankFromArchiveTimings::default(),
None,
) )
} }
/// Process blockstore from a known root bank // Process blockstore from a known root bank
#[allow(clippy::too_many_arguments)]
pub(crate) fn process_blockstore_from_root( pub(crate) fn process_blockstore_from_root(
blockstore: &Blockstore, blockstore: &Blockstore,
bank: Bank, bank: Bank,
@ -539,10 +532,7 @@ pub(crate) fn process_blockstore_from_root(
recyclers: &VerifyRecyclers, recyclers: &VerifyRecyclers,
transaction_status_sender: Option<&TransactionStatusSender>, transaction_status_sender: Option<&TransactionStatusSender>,
cache_block_meta_sender: Option<&CacheBlockMetaSender>, cache_block_meta_sender: Option<&CacheBlockMetaSender>,
snapshot_config: Option<&SnapshotConfig>,
accounts_package_sender: AccountsPackageSender,
timings: BankFromArchiveTimings, timings: BankFromArchiveTimings,
last_full_snapshot_slot: Slot,
) -> BlockstoreProcessorResult { ) -> BlockstoreProcessorResult {
do_process_blockstore_from_root( do_process_blockstore_from_root(
blockstore, blockstore,
@ -551,14 +541,10 @@ pub(crate) fn process_blockstore_from_root(
recyclers, recyclers,
transaction_status_sender, transaction_status_sender,
cache_block_meta_sender, cache_block_meta_sender,
snapshot_config,
accounts_package_sender,
timings, timings,
Some(last_full_snapshot_slot),
) )
} }
#[allow(clippy::too_many_arguments)]
fn do_process_blockstore_from_root( fn do_process_blockstore_from_root(
blockstore: &Blockstore, blockstore: &Blockstore,
bank: Arc<Bank>, bank: Arc<Bank>,
@ -566,10 +552,7 @@ fn do_process_blockstore_from_root(
recyclers: &VerifyRecyclers, recyclers: &VerifyRecyclers,
transaction_status_sender: Option<&TransactionStatusSender>, transaction_status_sender: Option<&TransactionStatusSender>,
cache_block_meta_sender: Option<&CacheBlockMetaSender>, cache_block_meta_sender: Option<&CacheBlockMetaSender>,
snapshot_config: Option<&SnapshotConfig>,
accounts_package_sender: AccountsPackageSender,
timings: BankFromArchiveTimings, timings: BankFromArchiveTimings,
mut last_full_snapshot_slot: Option<Slot>,
) -> BlockstoreProcessorResult { ) -> BlockstoreProcessorResult {
info!("processing ledger from slot {}...", bank.slot()); info!("processing ledger from slot {}...", bank.slot());
@ -631,10 +614,7 @@ fn do_process_blockstore_from_root(
recyclers, recyclers,
transaction_status_sender, transaction_status_sender,
cache_block_meta_sender, cache_block_meta_sender,
snapshot_config,
accounts_package_sender,
&mut timing, &mut timing,
&mut last_full_snapshot_slot,
)?; )?;
initial_forks.sort_by_key(|bank| bank.slot()); initial_forks.sort_by_key(|bank| bank.slot());
@ -650,7 +630,6 @@ fn do_process_blockstore_from_root(
if initial_forks.is_empty() { if initial_forks.is_empty() {
return Err(BlockstoreProcessorError::NoValidForksFound); return Err(BlockstoreProcessorError::NoValidForksFound);
} }
let bank_forks = BankForks::new_from_banks(&initial_forks, root); let bank_forks = BankForks::new_from_banks(&initial_forks, root);
let processing_time = now.elapsed(); let processing_time = now.elapsed();
@ -718,7 +697,7 @@ fn do_process_blockstore_from_root(
); );
assert!(bank_forks.active_banks().is_empty()); assert!(bank_forks.active_banks().is_empty());
Ok((bank_forks, leader_schedule_cache, last_full_snapshot_slot)) Ok((bank_forks, leader_schedule_cache))
} }
/// Verify that a segment of entries has the correct number of ticks and hashes /// Verify that a segment of entries has the correct number of ticks and hashes
@ -1059,10 +1038,7 @@ fn load_frozen_forks(
recyclers: &VerifyRecyclers, recyclers: &VerifyRecyclers,
transaction_status_sender: Option<&TransactionStatusSender>, transaction_status_sender: Option<&TransactionStatusSender>,
cache_block_meta_sender: Option<&CacheBlockMetaSender>, cache_block_meta_sender: Option<&CacheBlockMetaSender>,
snapshot_config: Option<&SnapshotConfig>,
accounts_package_sender: AccountsPackageSender,
timing: &mut ExecuteTimings, timing: &mut ExecuteTimings,
last_full_snapshot_slot: &mut Option<Slot>,
) -> result::Result<Vec<Arc<Bank>>, BlockstoreProcessorError> { ) -> result::Result<Vec<Arc<Bank>>, BlockstoreProcessorError> {
let mut initial_forks = HashMap::new(); let mut initial_forks = HashMap::new();
let mut all_banks = HashMap::new(); let mut all_banks = HashMap::new();
@ -1078,7 +1054,6 @@ fn load_frozen_forks(
"load_frozen_forks() latest root from blockstore: {}, max_root: {}", "load_frozen_forks() latest root from blockstore: {}, max_root: {}",
blockstore_max_root, max_root, blockstore_max_root, max_root,
); );
process_next_slots( process_next_slots(
root_bank, root_bank,
root_meta, root_meta,
@ -1186,35 +1161,11 @@ fn load_frozen_forks(
leader_schedule_cache.set_root(new_root_bank); leader_schedule_cache.set_root(new_root_bank);
new_root_bank.squash(); new_root_bank.squash();
if let Some(snapshot_config) = snapshot_config {
let block_height = new_root_bank.block_height();
if block_height % snapshot_config.full_snapshot_archive_interval_slots == 0 {
snapshot_utils::snapshot_bank(
new_root_bank,
new_root_bank.src.slot_deltas(&new_root_bank.src.roots()),
&accounts_package_sender,
&snapshot_config.bank_snapshots_dir,
&snapshot_config.snapshot_archives_dir,
snapshot_config.snapshot_version,
snapshot_config.archive_format,
None,
Some(SnapshotType::FullSnapshot),
)
.expect("Failed to snapshot bank while loading frozen banks");
trace!(
"took bank snapshot for new root bank, block height: {}, slot: {}",
block_height,
*root
);
*last_full_snapshot_slot = Some(*root);
}
}
if last_free.elapsed() > Duration::from_secs(10) { if last_free.elapsed() > Duration::from_secs(10) {
// Must be called after `squash()`, so that AccountsDb knows what // Must be called after `squash()`, so that AccountsDb knows what
// the roots are for the cache flushing in exhaustively_free_unused_resource(). // the roots are for the cache flushing in exhaustively_free_unused_resource().
// This could take few secs; so update last_free later // This could take few secs; so update last_free later
new_root_bank.exhaustively_free_unused_resource(*last_full_snapshot_slot); new_root_bank.exhaustively_free_unused_resource(None);
last_free = Instant::now(); last_free = Instant::now();
} }
@ -1463,9 +1414,8 @@ pub mod tests {
use matches::assert_matches; use matches::assert_matches;
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
use solana_entry::entry::{create_ticks, next_entry, next_entry_mut}; use solana_entry::entry::{create_ticks, next_entry, next_entry_mut};
use solana_runtime::{ use solana_runtime::genesis_utils::{
genesis_utils::{self, create_genesis_config_with_vote_accounts, ValidatorVoteKeypairs}, self, create_genesis_config_with_vote_accounts, ValidatorVoteKeypairs,
snapshot_utils::{ArchiveFormat, SnapshotVersion},
}; };
use solana_sdk::{ use solana_sdk::{
account::{AccountSharedData, WritableAccount}, account::{AccountSharedData, WritableAccount},
@ -1482,11 +1432,7 @@ pub mod tests {
vote_state::{VoteState, VoteStateVersions, MAX_LOCKOUT_HISTORY}, vote_state::{VoteState, VoteStateVersions, MAX_LOCKOUT_HISTORY},
vote_transaction, vote_transaction,
}; };
use std::{ use std::{collections::BTreeSet, sync::RwLock};
collections::BTreeSet,
sync::{mpsc::channel, RwLock},
};
use tempfile::TempDir;
use trees::tr; use trees::tr;
fn test_process_blockstore( fn test_process_blockstore(
@ -1494,17 +1440,7 @@ pub mod tests {
blockstore: &Blockstore, blockstore: &Blockstore,
opts: ProcessOptions, opts: ProcessOptions,
) -> BlockstoreProcessorInner { ) -> BlockstoreProcessorInner {
let (accounts_package_sender, _) = channel(); process_blockstore(genesis_config, blockstore, Vec::new(), opts, None).unwrap()
process_blockstore(
genesis_config,
blockstore,
Vec::new(),
opts,
None,
None,
accounts_package_sender,
)
.unwrap()
} }
#[test] #[test]
@ -2292,7 +2228,7 @@ pub mod tests {
accounts_db_test_hash_calculation: true, accounts_db_test_hash_calculation: true,
..ProcessOptions::default() ..ProcessOptions::default()
}; };
let (_bank_forks, leader_schedule, _) = let (_bank_forks, leader_schedule) =
test_process_blockstore(&genesis_config, &blockstore, opts); test_process_blockstore(&genesis_config, &blockstore, opts);
assert_eq!(leader_schedule.max_schedules(), std::usize::MAX); assert_eq!(leader_schedule.max_schedules(), std::usize::MAX);
} }
@ -3069,18 +3005,14 @@ pub mod tests {
bank1.squash(); bank1.squash();
// Test process_blockstore_from_root() from slot 1 onwards // Test process_blockstore_from_root() from slot 1 onwards
let (accounts_package_sender, _) = channel(); let (bank_forks, _leader_schedule) = do_process_blockstore_from_root(
let (bank_forks, ..) = do_process_blockstore_from_root(
&blockstore, &blockstore,
bank1, bank1,
&opts, &opts,
&recyclers, &recyclers,
None, None,
None, None,
None,
accounts_package_sender,
BankFromArchiveTimings::default(), BankFromArchiveTimings::default(),
None,
) )
.unwrap(); .unwrap();
@ -3102,120 +3034,6 @@ pub mod tests {
verify_fork_infos(&bank_forks); verify_fork_infos(&bank_forks);
} }
/// Test that processing the blockstore is aware of incremental snapshots. When processing the
/// blockstore from a root, like what happens when loading from a snapshot, there may be new
/// roots that cross a full snapshot interval. In these cases, a bank snapshot must be taken,
/// so that a full snapshot archive is created and available by the time the background
/// services spin up.
///
/// For this test, process enough roots to cross the full snapshot interval multiple times.
/// Ensure afterwards that the snapshots were created.
#[test]
fn test_process_blockstore_from_root_with_snapshots() {
solana_logger::setup();
let GenesisConfigInfo {
mut genesis_config, ..
} = create_genesis_config(123);
let ticks_per_slot = 1;
genesis_config.ticks_per_slot = ticks_per_slot;
let (ledger_path, blockhash) = create_new_tmp_ledger!(&genesis_config);
let blockstore = Blockstore::open(&ledger_path).unwrap();
const ROOT_INTERVAL_SLOTS: Slot = 2;
const FULL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS: Slot = ROOT_INTERVAL_SLOTS * 5;
const LAST_SLOT: Slot = FULL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS * 4;
let mut last_hash = blockhash;
for i in 1..=LAST_SLOT {
last_hash =
fill_blockstore_slot_with_ticks(&blockstore, ticks_per_slot, i, i - 1, last_hash);
}
let roots_to_set = (0..=LAST_SLOT)
.step_by(ROOT_INTERVAL_SLOTS as usize)
.collect_vec();
blockstore.set_roots(roots_to_set.iter()).unwrap();
// Set up bank1
let bank0 = Arc::new(Bank::new_for_tests(&genesis_config));
let opts = ProcessOptions {
poh_verify: true,
accounts_db_test_hash_calculation: true,
..ProcessOptions::default()
};
let recyclers = VerifyRecyclers::default();
process_bank_0(&bank0, &blockstore, &opts, &recyclers, None);
let slot_start_processing = 1;
let bank = Arc::new(Bank::new_from_parent(
&bank0,
&Pubkey::default(),
slot_start_processing,
));
confirm_full_slot(
&blockstore,
&bank,
&opts,
&recyclers,
&mut ConfirmationProgress::new(bank0.last_blockhash()),
None,
None,
&mut ExecuteTimings::default(),
)
.unwrap();
bank.squash();
let bank_snapshots_tempdir = TempDir::new().unwrap();
let snapshot_config = SnapshotConfig {
full_snapshot_archive_interval_slots: FULL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS,
incremental_snapshot_archive_interval_slots: Slot::MAX, // value does not matter
snapshot_archives_dir: PathBuf::default(), // value does not matter
bank_snapshots_dir: bank_snapshots_tempdir.path().to_path_buf(),
archive_format: ArchiveFormat::TarZstd, // value does not matter
snapshot_version: SnapshotVersion::default(), // value does not matter
maximum_snapshots_to_retain: usize::MAX, // value does not matter
};
let (accounts_package_sender, accounts_package_receiver) = channel();
do_process_blockstore_from_root(
&blockstore,
bank,
&opts,
&recyclers,
None,
None,
Some(&snapshot_config),
accounts_package_sender.clone(),
BankFromArchiveTimings::default(),
None,
)
.unwrap();
// The `drop()` is necessary here in order to call `.iter()` on the channel below
drop(accounts_package_sender);
// Ensure all the AccountsPackages were created and sent to the AccountsPackageReceiver
let received_accounts_package_slots = accounts_package_receiver
.iter()
.map(|accounts_package| accounts_package.slot)
.collect::<Vec<_>>();
let expected_slots = (slot_start_processing..=LAST_SLOT)
.filter(|slot| slot % FULL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS == 0)
.collect::<Vec<_>>();
assert_eq!(received_accounts_package_slots, expected_slots);
// Ensure all the bank snapshots were created
let bank_snapshots = snapshot_utils::get_bank_snapshots(&bank_snapshots_tempdir);
let mut bank_snapshot_slots = bank_snapshots
.into_iter()
.map(|bank_snapshot| bank_snapshot.slot)
.collect::<Vec<_>>();
bank_snapshot_slots.sort_unstable();
assert_eq!(bank_snapshot_slots, expected_slots);
}
#[test] #[test]
#[ignore] #[ignore]
fn test_process_entries_stress() { fn test_process_entries_stress() {

View File

@ -836,12 +836,7 @@ pub fn bank_from_latest_snapshot_archives(
accounts_db_skip_shrink: bool, accounts_db_skip_shrink: bool,
verify_index: bool, verify_index: bool,
accounts_index_config: Option<AccountsIndexConfig>, accounts_index_config: Option<AccountsIndexConfig>,
) -> Result<( ) -> Result<(Bank, BankFromArchiveTimings)> {
Bank,
BankFromArchiveTimings,
FullSnapshotArchiveInfo,
Option<IncrementalSnapshotArchiveInfo>,
)> {
let full_snapshot_archive_info = get_highest_full_snapshot_archive_info(&snapshot_archives_dir) let full_snapshot_archive_info = get_highest_full_snapshot_archive_info(&snapshot_archives_dir)
.ok_or(SnapshotError::NoSnapshotArchives)?; .ok_or(SnapshotError::NoSnapshotArchives)?;
@ -893,12 +888,7 @@ pub fn bank_from_latest_snapshot_archives(
), ),
)?; )?;
Ok(( Ok((bank, timings))
bank,
timings,
full_snapshot_archive_info,
incremental_snapshot_archive_info,
))
} }
/// Check to make sure the deserialized bank's slot and hash matches the snapshot archive's slot /// Check to make sure the deserialized bank's slot and hash matches the snapshot archive's slot
@ -2723,7 +2713,7 @@ mod tests {
) )
.unwrap(); .unwrap();
let (deserialized_bank, ..) = bank_from_latest_snapshot_archives( let (deserialized_bank, _) = bank_from_latest_snapshot_archives(
&bank_snapshots_dir, &bank_snapshots_dir,
&snapshot_archives_dir, &snapshot_archives_dir,
&[accounts_dir.as_ref().to_path_buf()], &[accounts_dir.as_ref().to_path_buf()],