From e03826b6d89085e58a6c3600e570d4209f45e906 Mon Sep 17 00:00:00 2001 From: Dmitri Makarov Date: Thu, 27 Apr 2023 14:11:04 -0400 Subject: [PATCH] Refactor ledger-tool moving some of functions to a separate module (#31359) * Refactor ledger-tool moving some of functions to a separate module * Rename utils to ledger_utils, move arg parsing funcs to a new module --- ledger-tool/src/args.rs | 67 +++++ ledger-tool/src/ledger_utils.rs | 442 ++++++++++++++++++++++++++++ ledger-tool/src/main.rs | 492 +------------------------------- 3 files changed, 522 insertions(+), 479 deletions(-) create mode 100644 ledger-tool/src/args.rs create mode 100644 ledger-tool/src/ledger_utils.rs diff --git a/ledger-tool/src/args.rs b/ledger-tool/src/args.rs new file mode 100644 index 000000000..6d1753d47 --- /dev/null +++ b/ledger-tool/src/args.rs @@ -0,0 +1,67 @@ +use { + clap::{value_t, values_t_or_exit, ArgMatches}, + solana_runtime::{ + accounts_db::{AccountsDb, AccountsDbConfig, FillerAccountsConfig}, + accounts_index::{AccountsIndexConfig, IndexLimitMb}, + }, + solana_sdk::clock::Slot, + std::path::{Path, PathBuf}, +}; + +// Build an `AccountsDbConfig` from subcommand arguments. All of the arguments +// matched by this functional are either optional or have a default value. +// Thus, a subcommand need not support all of the arguments that are matched +// by this function. +pub fn get_accounts_db_config( + ledger_path: &Path, + arg_matches: &ArgMatches<'_>, +) -> AccountsDbConfig { + let accounts_index_bins = value_t!(arg_matches, "accounts_index_bins", usize).ok(); + let accounts_index_index_limit_mb = + if let Some(limit) = value_t!(arg_matches, "accounts_index_memory_limit_mb", usize).ok() { + IndexLimitMb::Limit(limit) + } else if arg_matches.is_present("disable_accounts_disk_index") { + IndexLimitMb::InMemOnly + } else { + IndexLimitMb::Unspecified + }; + let accounts_index_drives: Vec = if arg_matches.is_present("accounts_index_path") { + values_t_or_exit!(arg_matches, "accounts_index_path", String) + .into_iter() + .map(PathBuf::from) + .collect() + } else { + vec![ledger_path.join("accounts_index.ledger-tool")] + }; + let accounts_index_config = AccountsIndexConfig { + bins: accounts_index_bins, + index_limit_mb: accounts_index_index_limit_mb, + drives: Some(accounts_index_drives), + ..AccountsIndexConfig::default() + }; + + let filler_accounts_config = FillerAccountsConfig { + count: value_t!(arg_matches, "accounts_filler_count", usize).unwrap_or(0), + size: value_t!(arg_matches, "accounts_filler_size", usize).unwrap_or(0), + }; + + AccountsDbConfig { + index: Some(accounts_index_config), + accounts_hash_cache_path: Some(ledger_path.join(AccountsDb::ACCOUNTS_HASH_CACHE_DIR)), + filler_accounts_config, + ancient_append_vec_offset: value_t!(arg_matches, "accounts_db_ancient_append_vecs", i64) + .ok(), + exhaustively_verify_refcounts: arg_matches.is_present("accounts_db_verify_refcounts"), + skip_initial_hash_calc: arg_matches.is_present("accounts_db_skip_initial_hash_calculation"), + ..AccountsDbConfig::default() + } +} + +// This function is duplicated in validator/src/main.rs... +pub fn hardforks_of(matches: &ArgMatches<'_>, name: &str) -> Option> { + if matches.is_present(name) { + Some(values_t_or_exit!(matches, name, Slot)) + } else { + None + } +} diff --git a/ledger-tool/src/ledger_utils.rs b/ledger-tool/src/ledger_utils.rs new file mode 100644 index 000000000..9f2966654 --- /dev/null +++ b/ledger-tool/src/ledger_utils.rs @@ -0,0 +1,442 @@ +use { + clap::{value_t, value_t_or_exit, values_t_or_exit, ArgMatches}, + crossbeam_channel::unbounded, + log::*, + solana_core::{ + accounts_hash_verifier::AccountsHashVerifier, validator::BlockVerificationMethod, + }, + solana_geyser_plugin_manager::geyser_plugin_service::GeyserPluginService, + solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo}, + solana_ledger::{ + bank_forks_utils, + blockstore::{Blockstore, BlockstoreError}, + blockstore_options::{ + AccessType, BlockstoreOptions, BlockstoreRecoveryMode, LedgerColumnOptions, + ShredStorageType, + }, + blockstore_processor::{ + self, BlockstoreProcessorError, ProcessOptions, TransactionStatusSender, + }, + }, + solana_measure::measure::Measure, + solana_rpc::{ + transaction_notifier_interface::TransactionNotifierLock, + transaction_status_service::TransactionStatusService, + }, + solana_runtime::{ + accounts_background_service::{ + AbsRequestHandlers, AbsRequestSender, AccountsBackgroundService, + PrunedBanksRequestHandler, SnapshotRequestHandler, + }, + accounts_update_notifier_interface::AccountsUpdateNotifier, + bank_forks::BankForks, + hardened_unpack::open_genesis_config, + snapshot_config::SnapshotConfig, + snapshot_hash::StartingSnapshotHashes, + snapshot_utils::{ + self, clean_orphaned_account_snapshot_dirs, create_all_accounts_run_and_snapshot_dirs, + move_and_async_delete_path, + }, + }, + solana_sdk::{ + genesis_config::GenesisConfig, signature::Signer, signer::keypair::Keypair, + timing::timestamp, + }, + solana_streamer::socket::SocketAddrSpace, + std::{ + path::{Path, PathBuf}, + process::exit, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, RwLock, + }, + }, +}; + +pub fn get_shred_storage_type(ledger_path: &Path, message: &str) -> ShredStorageType { + // TODO: the following shred_storage_type inference must be updated once + // the rocksdb options can be constructed via load_options_file() as the + // value picked by passing None for `max_shred_storage_size` could affect + // the persisted rocksdb options file. + match ShredStorageType::from_ledger_path(ledger_path, None) { + Some(s) => s, + None => { + info!("{}", message); + ShredStorageType::RocksLevel + } + } +} + +pub fn load_bank_forks( + arg_matches: &ArgMatches, + genesis_config: &GenesisConfig, + blockstore: Arc, + process_options: ProcessOptions, + snapshot_archive_path: Option, + incremental_snapshot_archive_path: Option, +) -> Result<(Arc>, Option), BlockstoreProcessorError> { + let bank_snapshots_dir = blockstore + .ledger_path() + .join(if blockstore.is_primary_access() { + "snapshot" + } else { + "snapshot.ledger-tool" + }); + + let mut starting_slot = 0; // default start check with genesis + let snapshot_config = if arg_matches.is_present("no_snapshot") { + None + } else { + let full_snapshot_archives_dir = + snapshot_archive_path.unwrap_or_else(|| blockstore.ledger_path().to_path_buf()); + let incremental_snapshot_archives_dir = + incremental_snapshot_archive_path.unwrap_or_else(|| full_snapshot_archives_dir.clone()); + if let Some(full_snapshot_slot) = + snapshot_utils::get_highest_full_snapshot_archive_slot(&full_snapshot_archives_dir) + { + let incremental_snapshot_slot = + snapshot_utils::get_highest_incremental_snapshot_archive_slot( + &incremental_snapshot_archives_dir, + full_snapshot_slot, + ) + .unwrap_or_default(); + starting_slot = std::cmp::max(full_snapshot_slot, incremental_snapshot_slot); + } + + Some(SnapshotConfig { + full_snapshot_archives_dir, + incremental_snapshot_archives_dir, + bank_snapshots_dir: bank_snapshots_dir.clone(), + ..SnapshotConfig::new_load_only() + }) + }; + + match process_options.halt_at_slot { + // Skip the following checks for sentinel values of Some(0) and None. + // For Some(0), no slots will be be replayed after starting_slot. + // For None, all available children of starting_slot will be replayed. + None | Some(0) => {} + Some(halt_slot) => { + if halt_slot < starting_slot { + eprintln!( + "Unable to load bank forks at slot {halt_slot} because it is less than the starting slot {starting_slot}. \ + The starting slot will be the latest snapshot slot, or genesis if --no-snapshot flag specified or no snapshots found." + ); + exit(1); + } + // Check if we have the slot data necessary to replay from starting_slot to >= halt_slot. + if !blockstore.slot_range_connected(starting_slot, halt_slot) { + eprintln!( + "Unable to load bank forks at slot {halt_slot} due to disconnected blocks.", + ); + exit(1); + } + } + } + + let account_paths = if let Some(account_paths) = arg_matches.value_of("account_paths") { + // If this blockstore access is Primary, no other process (solana-validator) can hold + // Primary access. So, allow a custom accounts path without worry of wiping the accounts + // of solana-validator. + if !blockstore.is_primary_access() { + // Attempt to open the Blockstore in Primary access; if successful, no other process + // was holding Primary so allow things to proceed with custom accounts path. Release + // the Primary access instead of holding it to give priority to solana-validator over + // solana-ledger-tool should solana-validator start before we've finished. + info!( + "Checking if another process currently holding Primary access to {:?}", + blockstore.ledger_path() + ); + if Blockstore::open_with_options( + blockstore.ledger_path(), + BlockstoreOptions { + access_type: AccessType::PrimaryForMaintenance, + ..BlockstoreOptions::default() + }, + ) + .is_err() + { + // Couldn't get Primary access, error out to be defensive. + eprintln!("Error: custom accounts path is not supported under secondary access"); + exit(1); + } + } + account_paths.split(',').map(PathBuf::from).collect() + } else if blockstore.is_primary_access() { + vec![blockstore.ledger_path().join("accounts")] + } else { + let non_primary_accounts_path = blockstore.ledger_path().join("accounts.ledger-tool"); + info!( + "Default accounts path is switched aligning with Blockstore's secondary access: {:?}", + non_primary_accounts_path + ); + vec![non_primary_accounts_path] + }; + + let (account_run_paths, account_snapshot_paths) = + create_all_accounts_run_and_snapshot_dirs(&account_paths).unwrap_or_else(|err| { + eprintln!("Error: {err:?}"); + exit(1); + }); + + // From now on, use run/ paths in the same way as the previous account_paths. + let account_paths = account_run_paths; + + info!("Cleaning contents of account paths: {:?}", account_paths); + let mut measure = Measure::start("clean_accounts_paths"); + account_paths.iter().for_each(|path| { + if path.exists() { + move_and_async_delete_path(path); + } + }); + measure.stop(); + info!("done. {}", measure); + + info!( + "Cleaning contents of account snapshot paths: {:?}", + account_snapshot_paths + ); + if let Err(e) = + clean_orphaned_account_snapshot_dirs(&bank_snapshots_dir, &account_snapshot_paths) + { + eprintln!("Failed to clean orphaned account snapshot dirs. Error: {e:?}"); + exit(1); + } + + let mut accounts_update_notifier = Option::::default(); + let mut transaction_notifier = Option::::default(); + if arg_matches.is_present("geyser_plugin_config") { + let geyser_config_files = values_t_or_exit!(arg_matches, "geyser_plugin_config", String) + .into_iter() + .map(PathBuf::from) + .collect::>(); + + let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded(); + drop(confirmed_bank_sender); + let geyser_service = + GeyserPluginService::new(confirmed_bank_receiver, &geyser_config_files).unwrap_or_else( + |err| { + eprintln!("Failed to setup Geyser service: {err:?}"); + exit(1); + }, + ); + accounts_update_notifier = geyser_service.get_accounts_update_notifier(); + transaction_notifier = geyser_service.get_transaction_notifier(); + } + + let (bank_forks, leader_schedule_cache, starting_snapshot_hashes, ..) = + bank_forks_utils::load_bank_forks( + genesis_config, + blockstore.as_ref(), + account_paths, + None, + snapshot_config.as_ref(), + &process_options, + None, + accounts_update_notifier, + &Arc::default(), + ); + let block_verification_method = value_t!( + arg_matches, + "block_verification_method", + BlockVerificationMethod + ) + .unwrap_or_default(); + info!( + "Using: block-verification-method: {}", + block_verification_method, + ); + + let exit = Arc::new(AtomicBool::new(false)); + let node_id = Arc::new(Keypair::new()); + let cluster_info = Arc::new(ClusterInfo::new( + ContactInfo::new_localhost(&node_id.pubkey(), timestamp()), + Arc::clone(&node_id), + SocketAddrSpace::Unspecified, + )); + let (accounts_package_sender, accounts_package_receiver) = crossbeam_channel::unbounded(); + let accounts_hash_verifier = AccountsHashVerifier::new( + accounts_package_sender.clone(), + accounts_package_receiver, + None, + exit.clone(), + cluster_info, + None, + false, + None, + SnapshotConfig::new_load_only(), + ); + let (snapshot_request_sender, snapshot_request_receiver) = crossbeam_channel::unbounded(); + let accounts_background_request_sender = AbsRequestSender::new(snapshot_request_sender.clone()); + let snapshot_request_handler = SnapshotRequestHandler { + snapshot_config: SnapshotConfig::new_load_only(), + snapshot_request_sender, + snapshot_request_receiver, + accounts_package_sender, + }; + let pruned_banks_receiver = + AccountsBackgroundService::setup_bank_drop_callback(bank_forks.clone()); + let pruned_banks_request_handler = PrunedBanksRequestHandler { + pruned_banks_receiver, + }; + let abs_request_handler = AbsRequestHandlers { + snapshot_request_handler, + pruned_banks_request_handler, + }; + let accounts_background_service = AccountsBackgroundService::new( + bank_forks.clone(), + exit.clone(), + abs_request_handler, + process_options.accounts_db_test_hash_calculation, + None, + ); + + let (transaction_status_sender, transaction_status_service) = if transaction_notifier.is_some() + { + let (transaction_status_sender, transaction_status_receiver) = unbounded(); + let transaction_status_service = TransactionStatusService::new( + transaction_status_receiver, + Arc::default(), + false, + transaction_notifier, + blockstore.clone(), + false, + &exit, + ); + ( + Some(TransactionStatusSender { + sender: transaction_status_sender, + }), + Some(transaction_status_service), + ) + } else { + (None, None) + }; + + let result = blockstore_processor::process_blockstore_from_root( + blockstore.as_ref(), + &bank_forks, + &leader_schedule_cache, + &process_options, + transaction_status_sender.as_ref(), + None, + &accounts_background_request_sender, + ) + .map(|_| (bank_forks, starting_snapshot_hashes)); + + exit.store(true, Ordering::Relaxed); + accounts_background_service.join().unwrap(); + accounts_hash_verifier.join().unwrap(); + if let Some(service) = transaction_status_service { + service.join().unwrap(); + } + + result +} + +pub fn open_blockstore( + ledger_path: &Path, + access_type: AccessType, + wal_recovery_mode: Option, + force_update_to_open: bool, +) -> Blockstore { + let shred_storage_type = get_shred_storage_type( + ledger_path, + &format!( + "Shred stroage type cannot be inferred for ledger at {ledger_path:?}, \ + using default RocksLevel", + ), + ); + + match Blockstore::open_with_options( + ledger_path, + BlockstoreOptions { + access_type: access_type.clone(), + recovery_mode: wal_recovery_mode.clone(), + enforce_ulimit_nofile: true, + column_options: LedgerColumnOptions { + shred_storage_type, + ..LedgerColumnOptions::default() + }, + }, + ) { + Ok(blockstore) => blockstore, + Err(BlockstoreError::RocksDb(err)) + if (err + .to_string() + // Missing column family + .starts_with("Invalid argument: Column family not found:") + || err + .to_string() + // Missing essential file, indicative of blockstore not existing + .starts_with("IO error: No such file or directory:")) + && access_type == AccessType::Secondary => + { + error!("Blockstore is incompatible with current software and requires updates"); + if !force_update_to_open { + error!("Use --force-update-to-open to allow blockstore to update"); + exit(1); + } + open_blockstore_with_temporary_primary_access( + ledger_path, + access_type, + wal_recovery_mode, + ) + .unwrap_or_else(|err| { + error!( + "Failed to open blockstore (with --force-update-to-open) at {:?}: {:?}", + ledger_path, err + ); + exit(1); + }) + } + Err(err) => { + eprintln!("Failed to open blockstore at {ledger_path:?}: {err:?}"); + exit(1); + } + } +} + +/// Open blockstore with temporary primary access to allow necessary, +/// persistent changes to be made to the blockstore (such as creation of new +/// column family(s)). Then, continue opening with `original_access_type` +fn open_blockstore_with_temporary_primary_access( + ledger_path: &Path, + original_access_type: AccessType, + wal_recovery_mode: Option, +) -> Result { + // Open with Primary will allow any configuration that automatically + // updates to take effect + info!("Attempting to temporarily open blockstore with Primary access in order to update"); + { + let _ = Blockstore::open_with_options( + ledger_path, + BlockstoreOptions { + access_type: AccessType::PrimaryForMaintenance, + recovery_mode: wal_recovery_mode.clone(), + enforce_ulimit_nofile: true, + ..BlockstoreOptions::default() + }, + )?; + } + // Now, attempt to open the blockstore with original AccessType + info!( + "Blockstore forced open succeeded, retrying with original access: {:?}", + original_access_type + ); + Blockstore::open_with_options( + ledger_path, + BlockstoreOptions { + access_type: original_access_type, + recovery_mode: wal_recovery_mode, + enforce_ulimit_nofile: true, + ..BlockstoreOptions::default() + }, + ) +} + +pub fn open_genesis_config_by(ledger_path: &Path, matches: &ArgMatches<'_>) -> GenesisConfig { + let max_genesis_archive_unpacked_size = + value_t_or_exit!(matches, "max_genesis_archive_unpacked_size", u64); + open_genesis_config(ledger_path, max_genesis_archive_unpacked_size) +} diff --git a/ledger-tool/src/main.rs b/ledger-tool/src/main.rs index 79a61d7e2..1841f5cb0 100644 --- a/ledger-tool/src/main.rs +++ b/ledger-tool/src/main.rs @@ -1,12 +1,11 @@ #![allow(clippy::integer_arithmetic)] use { - crate::{bigtable::*, ledger_path::*, output::*}, + crate::{args::*, bigtable::*, ledger_path::*, ledger_utils::*, output::*}, chrono::{DateTime, Utc}, clap::{ crate_description, crate_name, value_t, value_t_or_exit, values_t_or_exit, App, AppSettings, Arg, ArgMatches, SubCommand, }, - crossbeam_channel::unbounded, dashmap::DashMap, itertools::Itertools, log::*, @@ -28,57 +27,37 @@ use { }, solana_cli_output::{CliAccount, CliAccountNewConfig, OutputFormat}, solana_core::{ - accounts_hash_verifier::AccountsHashVerifier, system_monitor_service::{SystemMonitorService, SystemMonitorStatsReportConfig}, validator::BlockVerificationMethod, }, solana_entry::entry::Entry, - solana_geyser_plugin_manager::geyser_plugin_service::GeyserPluginService, - solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo}, solana_ledger::{ ancestor_iterator::AncestorIterator, - bank_forks_utils, - blockstore::{create_new_ledger, Blockstore, BlockstoreError, PurgeType}, + blockstore::{create_new_ledger, Blockstore, PurgeType}, blockstore_db::{self, columns as cf, Column, ColumnName, Database}, blockstore_options::{ - AccessType, BlockstoreOptions, BlockstoreRecoveryMode, LedgerColumnOptions, - ShredStorageType, BLOCKSTORE_DIRECTORY_ROCKS_FIFO, - }, - blockstore_processor::{ - self, BlockstoreProcessorError, ProcessOptions, TransactionStatusSender, + AccessType, BlockstoreRecoveryMode, LedgerColumnOptions, + BLOCKSTORE_DIRECTORY_ROCKS_FIFO, }, + blockstore_processor::ProcessOptions, shred::Shred, }, solana_measure::{measure, measure::Measure}, - solana_rpc::{ - transaction_notifier_interface::TransactionNotifierLock, - transaction_status_service::TransactionStatusService, - }, solana_runtime::{ accounts::Accounts, - accounts_background_service::{ - AbsRequestHandlers, AbsRequestSender, AccountsBackgroundService, - PrunedBanksRequestHandler, SnapshotRequestHandler, - }, - accounts_db::{ - AccountsDb, AccountsDbConfig, CalcAccountsHashDataSource, FillerAccountsConfig, - }, - accounts_index::{AccountsIndexConfig, IndexLimitMb, ScanConfig}, - accounts_update_notifier_interface::AccountsUpdateNotifier, + accounts_db::CalcAccountsHashDataSource, + accounts_index::ScanConfig, bank::{Bank, RewardCalculationEvent, TotalAccountsStats}, bank_forks::BankForks, cost_model::CostModel, cost_tracker::CostTracker, - hardened_unpack::{open_genesis_config, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE}, + hardened_unpack::MAX_GENESIS_ARCHIVE_UNPACKED_SIZE, runtime_config::RuntimeConfig, snapshot_archive_info::SnapshotArchiveInfoGetter, - snapshot_config::SnapshotConfig, - snapshot_hash::StartingSnapshotHashes, snapshot_minimizer::SnapshotMinimizer, snapshot_utils::{ - self, clean_orphaned_account_snapshot_dirs, create_all_accounts_run_and_snapshot_dirs, - move_and_async_delete_path, ArchiveFormat, SnapshotVersion, - DEFAULT_ARCHIVE_COMPRESSION, SUPPORTED_ARCHIVE_COMPRESSION, + self, ArchiveFormat, SnapshotVersion, DEFAULT_ARCHIVE_COMPRESSION, + SUPPORTED_ARCHIVE_COMPRESSION, }, }, solana_sdk::{ @@ -87,24 +66,20 @@ use { clock::{Epoch, Slot}, feature::{self, Feature}, feature_set::{self, FeatureSet}, - genesis_config::{ClusterType, GenesisConfig}, + genesis_config::ClusterType, hash::Hash, inflation::Inflation, native_token::{lamports_to_sol, sol_to_lamports, Sol}, pubkey::Pubkey, rent::Rent, shred_version::compute_shred_version, - signature::Signer, - signer::keypair::Keypair, stake::{self, state::StakeState}, system_program, - timing::timestamp, transaction::{ MessageHash, SanitizedTransaction, SimpleAddressLoader, VersionedTransaction, }, }, solana_stake_program::stake_state::{self, PointValue}, - solana_streamer::socket::SocketAddrSpace, solana_vote_program::{ self, vote_state::{self, VoteState}, @@ -126,8 +101,10 @@ use { }, }; +mod args; mod bigtable; mod ledger_path; +mod ledger_utils; mod output; #[derive(PartialEq, Eq)] @@ -850,121 +827,6 @@ fn analyze_storage(database: &Database) { analyze_column::(database, "OptimisticSlots"); } -/// Open blockstore with temporary primary access to allow necessary, -/// persistent changes to be made to the blockstore (such as creation of new -/// column family(s)). Then, continue opening with `original_access_type` -fn open_blockstore_with_temporary_primary_access( - ledger_path: &Path, - original_access_type: AccessType, - wal_recovery_mode: Option, -) -> Result { - // Open with Primary will allow any configuration that automatically - // updates to take effect - info!("Attempting to temporarily open blockstore with Primary access in order to update"); - { - let _ = Blockstore::open_with_options( - ledger_path, - BlockstoreOptions { - access_type: AccessType::PrimaryForMaintenance, - recovery_mode: wal_recovery_mode.clone(), - enforce_ulimit_nofile: true, - ..BlockstoreOptions::default() - }, - )?; - } - // Now, attempt to open the blockstore with original AccessType - info!( - "Blockstore forced open succeeded, retrying with original access: {:?}", - original_access_type - ); - Blockstore::open_with_options( - ledger_path, - BlockstoreOptions { - access_type: original_access_type, - recovery_mode: wal_recovery_mode, - enforce_ulimit_nofile: true, - ..BlockstoreOptions::default() - }, - ) -} - -fn get_shred_storage_type(ledger_path: &Path, message: &str) -> ShredStorageType { - // TODO: the following shred_storage_type inference must be updated once - // the rocksdb options can be constructed via load_options_file() as the - // value picked by passing None for `max_shred_storage_size` could affect - // the persisted rocksdb options file. - match ShredStorageType::from_ledger_path(ledger_path, None) { - Some(s) => s, - None => { - info!("{}", message); - ShredStorageType::RocksLevel - } - } -} - -fn open_blockstore( - ledger_path: &Path, - access_type: AccessType, - wal_recovery_mode: Option, - force_update_to_open: bool, -) -> Blockstore { - let shred_storage_type = get_shred_storage_type( - ledger_path, - &format!( - "Shred stroage type cannot be inferred for ledger at {ledger_path:?}, \ - using default RocksLevel", - ), - ); - - match Blockstore::open_with_options( - ledger_path, - BlockstoreOptions { - access_type: access_type.clone(), - recovery_mode: wal_recovery_mode.clone(), - enforce_ulimit_nofile: true, - column_options: LedgerColumnOptions { - shred_storage_type, - ..LedgerColumnOptions::default() - }, - }, - ) { - Ok(blockstore) => blockstore, - Err(BlockstoreError::RocksDb(err)) - if (err - .to_string() - // Missing column family - .starts_with("Invalid argument: Column family not found:") - || err - .to_string() - // Missing essential file, indicative of blockstore not existing - .starts_with("IO error: No such file or directory:")) - && access_type == AccessType::Secondary => - { - error!("Blockstore is incompatible with current software and requires updates"); - if !force_update_to_open { - error!("Use --force-update-to-open to allow blockstore to update"); - exit(1); - } - open_blockstore_with_temporary_primary_access( - ledger_path, - access_type, - wal_recovery_mode, - ) - .unwrap_or_else(|err| { - error!( - "Failed to open blockstore (with --force-update-to-open) at {:?}: {:?}", - ledger_path, err - ); - exit(1); - }) - } - Err(err) => { - eprintln!("Failed to open blockstore at {ledger_path:?}: {err:?}"); - exit(1); - } - } -} - fn raw_key_to_slot(key: &[u8], column_name: &str) -> Option { match column_name { cf::SlotMeta::NAME => Some(cf::SlotMeta::slot(cf::SlotMeta::index(key))), @@ -1032,328 +894,6 @@ fn print_blockstore_file_metadata( Ok(()) } -// This function is duplicated in validator/src/main.rs... -fn hardforks_of(matches: &ArgMatches<'_>, name: &str) -> Option> { - if matches.is_present(name) { - Some(values_t_or_exit!(matches, name, Slot)) - } else { - None - } -} - -// Build an `AccountsDbConfig` from subcommand arguments. All of the arguments -// matched by this functional are either optional or have a default value. -// Thus, a subcommand need not support all of the arguments that are matched -// by this function. -fn get_accounts_db_config(ledger_path: &Path, arg_matches: &ArgMatches<'_>) -> AccountsDbConfig { - let accounts_index_bins = value_t!(arg_matches, "accounts_index_bins", usize).ok(); - let accounts_index_index_limit_mb = - if let Some(limit) = value_t!(arg_matches, "accounts_index_memory_limit_mb", usize).ok() { - IndexLimitMb::Limit(limit) - } else if arg_matches.is_present("disable_accounts_disk_index") { - IndexLimitMb::InMemOnly - } else { - IndexLimitMb::Unspecified - }; - let accounts_index_drives: Vec = if arg_matches.is_present("accounts_index_path") { - values_t_or_exit!(arg_matches, "accounts_index_path", String) - .into_iter() - .map(PathBuf::from) - .collect() - } else { - vec![ledger_path.join("accounts_index.ledger-tool")] - }; - let accounts_index_config = AccountsIndexConfig { - bins: accounts_index_bins, - index_limit_mb: accounts_index_index_limit_mb, - drives: Some(accounts_index_drives), - ..AccountsIndexConfig::default() - }; - - let filler_accounts_config = FillerAccountsConfig { - count: value_t!(arg_matches, "accounts_filler_count", usize).unwrap_or(0), - size: value_t!(arg_matches, "accounts_filler_size", usize).unwrap_or(0), - }; - - AccountsDbConfig { - index: Some(accounts_index_config), - accounts_hash_cache_path: Some(ledger_path.join(AccountsDb::ACCOUNTS_HASH_CACHE_DIR)), - filler_accounts_config, - ancient_append_vec_offset: value_t!(arg_matches, "accounts_db_ancient_append_vecs", i64) - .ok(), - exhaustively_verify_refcounts: arg_matches.is_present("accounts_db_verify_refcounts"), - skip_initial_hash_calc: arg_matches.is_present("accounts_db_skip_initial_hash_calculation"), - ..AccountsDbConfig::default() - } -} - -fn load_bank_forks( - arg_matches: &ArgMatches, - genesis_config: &GenesisConfig, - blockstore: Arc, - process_options: ProcessOptions, - snapshot_archive_path: Option, - incremental_snapshot_archive_path: Option, -) -> Result<(Arc>, Option), BlockstoreProcessorError> { - let bank_snapshots_dir = blockstore - .ledger_path() - .join(if blockstore.is_primary_access() { - "snapshot" - } else { - "snapshot.ledger-tool" - }); - - let mut starting_slot = 0; // default start check with genesis - let snapshot_config = if arg_matches.is_present("no_snapshot") { - None - } else { - let full_snapshot_archives_dir = - snapshot_archive_path.unwrap_or_else(|| blockstore.ledger_path().to_path_buf()); - let incremental_snapshot_archives_dir = - incremental_snapshot_archive_path.unwrap_or_else(|| full_snapshot_archives_dir.clone()); - if let Some(full_snapshot_slot) = - snapshot_utils::get_highest_full_snapshot_archive_slot(&full_snapshot_archives_dir) - { - let incremental_snapshot_slot = - snapshot_utils::get_highest_incremental_snapshot_archive_slot( - &incremental_snapshot_archives_dir, - full_snapshot_slot, - ) - .unwrap_or_default(); - starting_slot = std::cmp::max(full_snapshot_slot, incremental_snapshot_slot); - } - - Some(SnapshotConfig { - full_snapshot_archives_dir, - incremental_snapshot_archives_dir, - bank_snapshots_dir: bank_snapshots_dir.clone(), - ..SnapshotConfig::new_load_only() - }) - }; - - match process_options.halt_at_slot { - // Skip the following checks for sentinel values of Some(0) and None. - // For Some(0), no slots will be be replayed after starting_slot. - // For None, all available children of starting_slot will be replayed. - None | Some(0) => {} - Some(halt_slot) => { - if halt_slot < starting_slot { - eprintln!( - "Unable to load bank forks at slot {halt_slot} because it is less than the starting slot {starting_slot}. \ - The starting slot will be the latest snapshot slot, or genesis if --no-snapshot flag specified or no snapshots found." - ); - exit(1); - } - // Check if we have the slot data necessary to replay from starting_slot to >= halt_slot. - if !blockstore.slot_range_connected(starting_slot, halt_slot) { - eprintln!( - "Unable to load bank forks at slot {halt_slot} due to disconnected blocks.", - ); - exit(1); - } - } - } - - let account_paths = if let Some(account_paths) = arg_matches.value_of("account_paths") { - // If this blockstore access is Primary, no other process (solana-validator) can hold - // Primary access. So, allow a custom accounts path without worry of wiping the accounts - // of solana-validator. - if !blockstore.is_primary_access() { - // Attempt to open the Blockstore in Primary access; if successful, no other process - // was holding Primary so allow things to proceed with custom accounts path. Release - // the Primary access instead of holding it to give priority to solana-validator over - // solana-ledger-tool should solana-validator start before we've finished. - info!( - "Checking if another process currently holding Primary access to {:?}", - blockstore.ledger_path() - ); - if Blockstore::open_with_options( - blockstore.ledger_path(), - BlockstoreOptions { - access_type: AccessType::PrimaryForMaintenance, - ..BlockstoreOptions::default() - }, - ) - .is_err() - { - // Couldn't get Primary access, error out to be defensive. - eprintln!("Error: custom accounts path is not supported under secondary access"); - exit(1); - } - } - account_paths.split(',').map(PathBuf::from).collect() - } else if blockstore.is_primary_access() { - vec![blockstore.ledger_path().join("accounts")] - } else { - let non_primary_accounts_path = blockstore.ledger_path().join("accounts.ledger-tool"); - info!( - "Default accounts path is switched aligning with Blockstore's secondary access: {:?}", - non_primary_accounts_path - ); - vec![non_primary_accounts_path] - }; - - let (account_run_paths, account_snapshot_paths) = - create_all_accounts_run_and_snapshot_dirs(&account_paths).unwrap_or_else(|err| { - eprintln!("Error: {err:?}"); - exit(1); - }); - - // From now on, use run/ paths in the same way as the previous account_paths. - let account_paths = account_run_paths; - - info!("Cleaning contents of account paths: {:?}", account_paths); - let mut measure = Measure::start("clean_accounts_paths"); - account_paths.iter().for_each(|path| { - if path.exists() { - move_and_async_delete_path(path); - } - }); - measure.stop(); - info!("done. {}", measure); - - info!( - "Cleaning contents of account snapshot paths: {:?}", - account_snapshot_paths - ); - if let Err(e) = - clean_orphaned_account_snapshot_dirs(&bank_snapshots_dir, &account_snapshot_paths) - { - eprintln!("Failed to clean orphaned account snapshot dirs. Error: {e:?}"); - exit(1); - } - - let mut accounts_update_notifier = Option::::default(); - let mut transaction_notifier = Option::::default(); - if arg_matches.is_present("geyser_plugin_config") { - let geyser_config_files = values_t_or_exit!(arg_matches, "geyser_plugin_config", String) - .into_iter() - .map(PathBuf::from) - .collect::>(); - - let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded(); - drop(confirmed_bank_sender); - let geyser_service = - GeyserPluginService::new(confirmed_bank_receiver, &geyser_config_files).unwrap_or_else( - |err| { - eprintln!("Failed to setup Geyser service: {err:?}"); - exit(1); - }, - ); - accounts_update_notifier = geyser_service.get_accounts_update_notifier(); - transaction_notifier = geyser_service.get_transaction_notifier(); - } - - let (bank_forks, leader_schedule_cache, starting_snapshot_hashes, ..) = - bank_forks_utils::load_bank_forks( - genesis_config, - blockstore.as_ref(), - account_paths, - None, - snapshot_config.as_ref(), - &process_options, - None, - accounts_update_notifier, - &Arc::default(), - ); - let block_verification_method = value_t!( - arg_matches, - "block_verification_method", - BlockVerificationMethod - ) - .unwrap_or_default(); - info!( - "Using: block-verification-method: {}", - block_verification_method, - ); - - let exit = Arc::new(AtomicBool::new(false)); - let node_id = Arc::new(Keypair::new()); - let cluster_info = Arc::new(ClusterInfo::new( - ContactInfo::new_localhost(&node_id.pubkey(), timestamp()), - Arc::clone(&node_id), - SocketAddrSpace::Unspecified, - )); - let (accounts_package_sender, accounts_package_receiver) = crossbeam_channel::unbounded(); - let accounts_hash_verifier = AccountsHashVerifier::new( - accounts_package_sender.clone(), - accounts_package_receiver, - None, - exit.clone(), - cluster_info, - None, - false, - None, - SnapshotConfig::new_load_only(), - ); - let (snapshot_request_sender, snapshot_request_receiver) = crossbeam_channel::unbounded(); - let accounts_background_request_sender = AbsRequestSender::new(snapshot_request_sender.clone()); - let snapshot_request_handler = SnapshotRequestHandler { - snapshot_config: SnapshotConfig::new_load_only(), - snapshot_request_sender, - snapshot_request_receiver, - accounts_package_sender, - }; - let pruned_banks_receiver = - AccountsBackgroundService::setup_bank_drop_callback(bank_forks.clone()); - let pruned_banks_request_handler = PrunedBanksRequestHandler { - pruned_banks_receiver, - }; - let abs_request_handler = AbsRequestHandlers { - snapshot_request_handler, - pruned_banks_request_handler, - }; - let accounts_background_service = AccountsBackgroundService::new( - bank_forks.clone(), - exit.clone(), - abs_request_handler, - process_options.accounts_db_test_hash_calculation, - None, - ); - - let (transaction_status_sender, transaction_status_service) = if transaction_notifier.is_some() - { - let (transaction_status_sender, transaction_status_receiver) = unbounded(); - let transaction_status_service = TransactionStatusService::new( - transaction_status_receiver, - Arc::default(), - false, - transaction_notifier, - blockstore.clone(), - false, - &exit, - ); - ( - Some(TransactionStatusSender { - sender: transaction_status_sender, - }), - Some(transaction_status_service), - ) - } else { - (None, None) - }; - - let result = blockstore_processor::process_blockstore_from_root( - blockstore.as_ref(), - &bank_forks, - &leader_schedule_cache, - &process_options, - transaction_status_sender.as_ref(), - None, - &accounts_background_request_sender, - ) - .map(|_| (bank_forks, starting_snapshot_hashes)); - - exit.store(true, Ordering::Relaxed); - accounts_background_service.join().unwrap(); - accounts_hash_verifier.join().unwrap(); - if let Some(service) = transaction_status_service { - service.join().unwrap(); - } - - result -} - fn compute_slot_cost(blockstore: &Blockstore, slot: Slot) -> Result<(), String> { if blockstore.is_dead(slot) { return Err("Dead slot".to_string()); @@ -1413,12 +953,6 @@ fn compute_slot_cost(blockstore: &Blockstore, slot: Slot) -> Result<(), String> Ok(()) } -fn open_genesis_config_by(ledger_path: &Path, matches: &ArgMatches<'_>) -> GenesisConfig { - let max_genesis_archive_unpacked_size = - value_t_or_exit!(matches, "max_genesis_archive_unpacked_size", u64); - open_genesis_config(ledger_path, max_genesis_archive_unpacked_size) -} - /// Finds the accounts needed to replay slots `snapshot_slot` to `ending_slot`. /// Removes all other accounts from accounts_db, and updates the accounts hash /// and capitalization. This is used by the --minimize option in create-snapshot