Refactor ledger-tool moving some of functions to a separate module (#31359)

* Refactor ledger-tool moving some of functions to a separate module

* Rename utils to ledger_utils, move arg parsing funcs to a new module
This commit is contained in:
Dmitri Makarov 2023-04-27 14:11:04 -04:00 committed by GitHub
parent 9af7009bb4
commit e03826b6d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 522 additions and 479 deletions

67
ledger-tool/src/args.rs Normal file
View File

@ -0,0 +1,67 @@
use {
clap::{value_t, values_t_or_exit, ArgMatches},
solana_runtime::{
accounts_db::{AccountsDb, AccountsDbConfig, FillerAccountsConfig},
accounts_index::{AccountsIndexConfig, IndexLimitMb},
},
solana_sdk::clock::Slot,
std::path::{Path, PathBuf},
};
// Build an `AccountsDbConfig` from subcommand arguments. All of the arguments
// matched by this functional are either optional or have a default value.
// Thus, a subcommand need not support all of the arguments that are matched
// by this function.
pub fn get_accounts_db_config(
ledger_path: &Path,
arg_matches: &ArgMatches<'_>,
) -> AccountsDbConfig {
let accounts_index_bins = value_t!(arg_matches, "accounts_index_bins", usize).ok();
let accounts_index_index_limit_mb =
if let Some(limit) = value_t!(arg_matches, "accounts_index_memory_limit_mb", usize).ok() {
IndexLimitMb::Limit(limit)
} else if arg_matches.is_present("disable_accounts_disk_index") {
IndexLimitMb::InMemOnly
} else {
IndexLimitMb::Unspecified
};
let accounts_index_drives: Vec<PathBuf> = if arg_matches.is_present("accounts_index_path") {
values_t_or_exit!(arg_matches, "accounts_index_path", String)
.into_iter()
.map(PathBuf::from)
.collect()
} else {
vec![ledger_path.join("accounts_index.ledger-tool")]
};
let accounts_index_config = AccountsIndexConfig {
bins: accounts_index_bins,
index_limit_mb: accounts_index_index_limit_mb,
drives: Some(accounts_index_drives),
..AccountsIndexConfig::default()
};
let filler_accounts_config = FillerAccountsConfig {
count: value_t!(arg_matches, "accounts_filler_count", usize).unwrap_or(0),
size: value_t!(arg_matches, "accounts_filler_size", usize).unwrap_or(0),
};
AccountsDbConfig {
index: Some(accounts_index_config),
accounts_hash_cache_path: Some(ledger_path.join(AccountsDb::ACCOUNTS_HASH_CACHE_DIR)),
filler_accounts_config,
ancient_append_vec_offset: value_t!(arg_matches, "accounts_db_ancient_append_vecs", i64)
.ok(),
exhaustively_verify_refcounts: arg_matches.is_present("accounts_db_verify_refcounts"),
skip_initial_hash_calc: arg_matches.is_present("accounts_db_skip_initial_hash_calculation"),
..AccountsDbConfig::default()
}
}
// This function is duplicated in validator/src/main.rs...
pub fn hardforks_of(matches: &ArgMatches<'_>, name: &str) -> Option<Vec<Slot>> {
if matches.is_present(name) {
Some(values_t_or_exit!(matches, name, Slot))
} else {
None
}
}

View File

@ -0,0 +1,442 @@
use {
clap::{value_t, value_t_or_exit, values_t_or_exit, ArgMatches},
crossbeam_channel::unbounded,
log::*,
solana_core::{
accounts_hash_verifier::AccountsHashVerifier, validator::BlockVerificationMethod,
},
solana_geyser_plugin_manager::geyser_plugin_service::GeyserPluginService,
solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo},
solana_ledger::{
bank_forks_utils,
blockstore::{Blockstore, BlockstoreError},
blockstore_options::{
AccessType, BlockstoreOptions, BlockstoreRecoveryMode, LedgerColumnOptions,
ShredStorageType,
},
blockstore_processor::{
self, BlockstoreProcessorError, ProcessOptions, TransactionStatusSender,
},
},
solana_measure::measure::Measure,
solana_rpc::{
transaction_notifier_interface::TransactionNotifierLock,
transaction_status_service::TransactionStatusService,
},
solana_runtime::{
accounts_background_service::{
AbsRequestHandlers, AbsRequestSender, AccountsBackgroundService,
PrunedBanksRequestHandler, SnapshotRequestHandler,
},
accounts_update_notifier_interface::AccountsUpdateNotifier,
bank_forks::BankForks,
hardened_unpack::open_genesis_config,
snapshot_config::SnapshotConfig,
snapshot_hash::StartingSnapshotHashes,
snapshot_utils::{
self, clean_orphaned_account_snapshot_dirs, create_all_accounts_run_and_snapshot_dirs,
move_and_async_delete_path,
},
},
solana_sdk::{
genesis_config::GenesisConfig, signature::Signer, signer::keypair::Keypair,
timing::timestamp,
},
solana_streamer::socket::SocketAddrSpace,
std::{
path::{Path, PathBuf},
process::exit,
sync::{
atomic::{AtomicBool, Ordering},
Arc, RwLock,
},
},
};
pub fn get_shred_storage_type(ledger_path: &Path, message: &str) -> ShredStorageType {
// TODO: the following shred_storage_type inference must be updated once
// the rocksdb options can be constructed via load_options_file() as the
// value picked by passing None for `max_shred_storage_size` could affect
// the persisted rocksdb options file.
match ShredStorageType::from_ledger_path(ledger_path, None) {
Some(s) => s,
None => {
info!("{}", message);
ShredStorageType::RocksLevel
}
}
}
pub fn load_bank_forks(
arg_matches: &ArgMatches,
genesis_config: &GenesisConfig,
blockstore: Arc<Blockstore>,
process_options: ProcessOptions,
snapshot_archive_path: Option<PathBuf>,
incremental_snapshot_archive_path: Option<PathBuf>,
) -> Result<(Arc<RwLock<BankForks>>, Option<StartingSnapshotHashes>), BlockstoreProcessorError> {
let bank_snapshots_dir = blockstore
.ledger_path()
.join(if blockstore.is_primary_access() {
"snapshot"
} else {
"snapshot.ledger-tool"
});
let mut starting_slot = 0; // default start check with genesis
let snapshot_config = if arg_matches.is_present("no_snapshot") {
None
} else {
let full_snapshot_archives_dir =
snapshot_archive_path.unwrap_or_else(|| blockstore.ledger_path().to_path_buf());
let incremental_snapshot_archives_dir =
incremental_snapshot_archive_path.unwrap_or_else(|| full_snapshot_archives_dir.clone());
if let Some(full_snapshot_slot) =
snapshot_utils::get_highest_full_snapshot_archive_slot(&full_snapshot_archives_dir)
{
let incremental_snapshot_slot =
snapshot_utils::get_highest_incremental_snapshot_archive_slot(
&incremental_snapshot_archives_dir,
full_snapshot_slot,
)
.unwrap_or_default();
starting_slot = std::cmp::max(full_snapshot_slot, incremental_snapshot_slot);
}
Some(SnapshotConfig {
full_snapshot_archives_dir,
incremental_snapshot_archives_dir,
bank_snapshots_dir: bank_snapshots_dir.clone(),
..SnapshotConfig::new_load_only()
})
};
match process_options.halt_at_slot {
// Skip the following checks for sentinel values of Some(0) and None.
// For Some(0), no slots will be be replayed after starting_slot.
// For None, all available children of starting_slot will be replayed.
None | Some(0) => {}
Some(halt_slot) => {
if halt_slot < starting_slot {
eprintln!(
"Unable to load bank forks at slot {halt_slot} because it is less than the starting slot {starting_slot}. \
The starting slot will be the latest snapshot slot, or genesis if --no-snapshot flag specified or no snapshots found."
);
exit(1);
}
// Check if we have the slot data necessary to replay from starting_slot to >= halt_slot.
if !blockstore.slot_range_connected(starting_slot, halt_slot) {
eprintln!(
"Unable to load bank forks at slot {halt_slot} due to disconnected blocks.",
);
exit(1);
}
}
}
let account_paths = if let Some(account_paths) = arg_matches.value_of("account_paths") {
// If this blockstore access is Primary, no other process (solana-validator) can hold
// Primary access. So, allow a custom accounts path without worry of wiping the accounts
// of solana-validator.
if !blockstore.is_primary_access() {
// Attempt to open the Blockstore in Primary access; if successful, no other process
// was holding Primary so allow things to proceed with custom accounts path. Release
// the Primary access instead of holding it to give priority to solana-validator over
// solana-ledger-tool should solana-validator start before we've finished.
info!(
"Checking if another process currently holding Primary access to {:?}",
blockstore.ledger_path()
);
if Blockstore::open_with_options(
blockstore.ledger_path(),
BlockstoreOptions {
access_type: AccessType::PrimaryForMaintenance,
..BlockstoreOptions::default()
},
)
.is_err()
{
// Couldn't get Primary access, error out to be defensive.
eprintln!("Error: custom accounts path is not supported under secondary access");
exit(1);
}
}
account_paths.split(',').map(PathBuf::from).collect()
} else if blockstore.is_primary_access() {
vec![blockstore.ledger_path().join("accounts")]
} else {
let non_primary_accounts_path = blockstore.ledger_path().join("accounts.ledger-tool");
info!(
"Default accounts path is switched aligning with Blockstore's secondary access: {:?}",
non_primary_accounts_path
);
vec![non_primary_accounts_path]
};
let (account_run_paths, account_snapshot_paths) =
create_all_accounts_run_and_snapshot_dirs(&account_paths).unwrap_or_else(|err| {
eprintln!("Error: {err:?}");
exit(1);
});
// From now on, use run/ paths in the same way as the previous account_paths.
let account_paths = account_run_paths;
info!("Cleaning contents of account paths: {:?}", account_paths);
let mut measure = Measure::start("clean_accounts_paths");
account_paths.iter().for_each(|path| {
if path.exists() {
move_and_async_delete_path(path);
}
});
measure.stop();
info!("done. {}", measure);
info!(
"Cleaning contents of account snapshot paths: {:?}",
account_snapshot_paths
);
if let Err(e) =
clean_orphaned_account_snapshot_dirs(&bank_snapshots_dir, &account_snapshot_paths)
{
eprintln!("Failed to clean orphaned account snapshot dirs. Error: {e:?}");
exit(1);
}
let mut accounts_update_notifier = Option::<AccountsUpdateNotifier>::default();
let mut transaction_notifier = Option::<TransactionNotifierLock>::default();
if arg_matches.is_present("geyser_plugin_config") {
let geyser_config_files = values_t_or_exit!(arg_matches, "geyser_plugin_config", String)
.into_iter()
.map(PathBuf::from)
.collect::<Vec<_>>();
let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded();
drop(confirmed_bank_sender);
let geyser_service =
GeyserPluginService::new(confirmed_bank_receiver, &geyser_config_files).unwrap_or_else(
|err| {
eprintln!("Failed to setup Geyser service: {err:?}");
exit(1);
},
);
accounts_update_notifier = geyser_service.get_accounts_update_notifier();
transaction_notifier = geyser_service.get_transaction_notifier();
}
let (bank_forks, leader_schedule_cache, starting_snapshot_hashes, ..) =
bank_forks_utils::load_bank_forks(
genesis_config,
blockstore.as_ref(),
account_paths,
None,
snapshot_config.as_ref(),
&process_options,
None,
accounts_update_notifier,
&Arc::default(),
);
let block_verification_method = value_t!(
arg_matches,
"block_verification_method",
BlockVerificationMethod
)
.unwrap_or_default();
info!(
"Using: block-verification-method: {}",
block_verification_method,
);
let exit = Arc::new(AtomicBool::new(false));
let node_id = Arc::new(Keypair::new());
let cluster_info = Arc::new(ClusterInfo::new(
ContactInfo::new_localhost(&node_id.pubkey(), timestamp()),
Arc::clone(&node_id),
SocketAddrSpace::Unspecified,
));
let (accounts_package_sender, accounts_package_receiver) = crossbeam_channel::unbounded();
let accounts_hash_verifier = AccountsHashVerifier::new(
accounts_package_sender.clone(),
accounts_package_receiver,
None,
exit.clone(),
cluster_info,
None,
false,
None,
SnapshotConfig::new_load_only(),
);
let (snapshot_request_sender, snapshot_request_receiver) = crossbeam_channel::unbounded();
let accounts_background_request_sender = AbsRequestSender::new(snapshot_request_sender.clone());
let snapshot_request_handler = SnapshotRequestHandler {
snapshot_config: SnapshotConfig::new_load_only(),
snapshot_request_sender,
snapshot_request_receiver,
accounts_package_sender,
};
let pruned_banks_receiver =
AccountsBackgroundService::setup_bank_drop_callback(bank_forks.clone());
let pruned_banks_request_handler = PrunedBanksRequestHandler {
pruned_banks_receiver,
};
let abs_request_handler = AbsRequestHandlers {
snapshot_request_handler,
pruned_banks_request_handler,
};
let accounts_background_service = AccountsBackgroundService::new(
bank_forks.clone(),
exit.clone(),
abs_request_handler,
process_options.accounts_db_test_hash_calculation,
None,
);
let (transaction_status_sender, transaction_status_service) = if transaction_notifier.is_some()
{
let (transaction_status_sender, transaction_status_receiver) = unbounded();
let transaction_status_service = TransactionStatusService::new(
transaction_status_receiver,
Arc::default(),
false,
transaction_notifier,
blockstore.clone(),
false,
&exit,
);
(
Some(TransactionStatusSender {
sender: transaction_status_sender,
}),
Some(transaction_status_service),
)
} else {
(None, None)
};
let result = blockstore_processor::process_blockstore_from_root(
blockstore.as_ref(),
&bank_forks,
&leader_schedule_cache,
&process_options,
transaction_status_sender.as_ref(),
None,
&accounts_background_request_sender,
)
.map(|_| (bank_forks, starting_snapshot_hashes));
exit.store(true, Ordering::Relaxed);
accounts_background_service.join().unwrap();
accounts_hash_verifier.join().unwrap();
if let Some(service) = transaction_status_service {
service.join().unwrap();
}
result
}
pub fn open_blockstore(
ledger_path: &Path,
access_type: AccessType,
wal_recovery_mode: Option<BlockstoreRecoveryMode>,
force_update_to_open: bool,
) -> Blockstore {
let shred_storage_type = get_shred_storage_type(
ledger_path,
&format!(
"Shred stroage type cannot be inferred for ledger at {ledger_path:?}, \
using default RocksLevel",
),
);
match Blockstore::open_with_options(
ledger_path,
BlockstoreOptions {
access_type: access_type.clone(),
recovery_mode: wal_recovery_mode.clone(),
enforce_ulimit_nofile: true,
column_options: LedgerColumnOptions {
shred_storage_type,
..LedgerColumnOptions::default()
},
},
) {
Ok(blockstore) => blockstore,
Err(BlockstoreError::RocksDb(err))
if (err
.to_string()
// Missing column family
.starts_with("Invalid argument: Column family not found:")
|| err
.to_string()
// Missing essential file, indicative of blockstore not existing
.starts_with("IO error: No such file or directory:"))
&& access_type == AccessType::Secondary =>
{
error!("Blockstore is incompatible with current software and requires updates");
if !force_update_to_open {
error!("Use --force-update-to-open to allow blockstore to update");
exit(1);
}
open_blockstore_with_temporary_primary_access(
ledger_path,
access_type,
wal_recovery_mode,
)
.unwrap_or_else(|err| {
error!(
"Failed to open blockstore (with --force-update-to-open) at {:?}: {:?}",
ledger_path, err
);
exit(1);
})
}
Err(err) => {
eprintln!("Failed to open blockstore at {ledger_path:?}: {err:?}");
exit(1);
}
}
}
/// Open blockstore with temporary primary access to allow necessary,
/// persistent changes to be made to the blockstore (such as creation of new
/// column family(s)). Then, continue opening with `original_access_type`
fn open_blockstore_with_temporary_primary_access(
ledger_path: &Path,
original_access_type: AccessType,
wal_recovery_mode: Option<BlockstoreRecoveryMode>,
) -> Result<Blockstore, BlockstoreError> {
// Open with Primary will allow any configuration that automatically
// updates to take effect
info!("Attempting to temporarily open blockstore with Primary access in order to update");
{
let _ = Blockstore::open_with_options(
ledger_path,
BlockstoreOptions {
access_type: AccessType::PrimaryForMaintenance,
recovery_mode: wal_recovery_mode.clone(),
enforce_ulimit_nofile: true,
..BlockstoreOptions::default()
},
)?;
}
// Now, attempt to open the blockstore with original AccessType
info!(
"Blockstore forced open succeeded, retrying with original access: {:?}",
original_access_type
);
Blockstore::open_with_options(
ledger_path,
BlockstoreOptions {
access_type: original_access_type,
recovery_mode: wal_recovery_mode,
enforce_ulimit_nofile: true,
..BlockstoreOptions::default()
},
)
}
pub fn open_genesis_config_by(ledger_path: &Path, matches: &ArgMatches<'_>) -> GenesisConfig {
let max_genesis_archive_unpacked_size =
value_t_or_exit!(matches, "max_genesis_archive_unpacked_size", u64);
open_genesis_config(ledger_path, max_genesis_archive_unpacked_size)
}

View File

@ -1,12 +1,11 @@
#![allow(clippy::integer_arithmetic)]
use {
crate::{bigtable::*, ledger_path::*, output::*},
crate::{args::*, bigtable::*, ledger_path::*, ledger_utils::*, output::*},
chrono::{DateTime, Utc},
clap::{
crate_description, crate_name, value_t, value_t_or_exit, values_t_or_exit, App,
AppSettings, Arg, ArgMatches, SubCommand,
},
crossbeam_channel::unbounded,
dashmap::DashMap,
itertools::Itertools,
log::*,
@ -28,57 +27,37 @@ use {
},
solana_cli_output::{CliAccount, CliAccountNewConfig, OutputFormat},
solana_core::{
accounts_hash_verifier::AccountsHashVerifier,
system_monitor_service::{SystemMonitorService, SystemMonitorStatsReportConfig},
validator::BlockVerificationMethod,
},
solana_entry::entry::Entry,
solana_geyser_plugin_manager::geyser_plugin_service::GeyserPluginService,
solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo},
solana_ledger::{
ancestor_iterator::AncestorIterator,
bank_forks_utils,
blockstore::{create_new_ledger, Blockstore, BlockstoreError, PurgeType},
blockstore::{create_new_ledger, Blockstore, PurgeType},
blockstore_db::{self, columns as cf, Column, ColumnName, Database},
blockstore_options::{
AccessType, BlockstoreOptions, BlockstoreRecoveryMode, LedgerColumnOptions,
ShredStorageType, BLOCKSTORE_DIRECTORY_ROCKS_FIFO,
},
blockstore_processor::{
self, BlockstoreProcessorError, ProcessOptions, TransactionStatusSender,
AccessType, BlockstoreRecoveryMode, LedgerColumnOptions,
BLOCKSTORE_DIRECTORY_ROCKS_FIFO,
},
blockstore_processor::ProcessOptions,
shred::Shred,
},
solana_measure::{measure, measure::Measure},
solana_rpc::{
transaction_notifier_interface::TransactionNotifierLock,
transaction_status_service::TransactionStatusService,
},
solana_runtime::{
accounts::Accounts,
accounts_background_service::{
AbsRequestHandlers, AbsRequestSender, AccountsBackgroundService,
PrunedBanksRequestHandler, SnapshotRequestHandler,
},
accounts_db::{
AccountsDb, AccountsDbConfig, CalcAccountsHashDataSource, FillerAccountsConfig,
},
accounts_index::{AccountsIndexConfig, IndexLimitMb, ScanConfig},
accounts_update_notifier_interface::AccountsUpdateNotifier,
accounts_db::CalcAccountsHashDataSource,
accounts_index::ScanConfig,
bank::{Bank, RewardCalculationEvent, TotalAccountsStats},
bank_forks::BankForks,
cost_model::CostModel,
cost_tracker::CostTracker,
hardened_unpack::{open_genesis_config, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE},
hardened_unpack::MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
runtime_config::RuntimeConfig,
snapshot_archive_info::SnapshotArchiveInfoGetter,
snapshot_config::SnapshotConfig,
snapshot_hash::StartingSnapshotHashes,
snapshot_minimizer::SnapshotMinimizer,
snapshot_utils::{
self, clean_orphaned_account_snapshot_dirs, create_all_accounts_run_and_snapshot_dirs,
move_and_async_delete_path, ArchiveFormat, SnapshotVersion,
DEFAULT_ARCHIVE_COMPRESSION, SUPPORTED_ARCHIVE_COMPRESSION,
self, ArchiveFormat, SnapshotVersion, DEFAULT_ARCHIVE_COMPRESSION,
SUPPORTED_ARCHIVE_COMPRESSION,
},
},
solana_sdk::{
@ -87,24 +66,20 @@ use {
clock::{Epoch, Slot},
feature::{self, Feature},
feature_set::{self, FeatureSet},
genesis_config::{ClusterType, GenesisConfig},
genesis_config::ClusterType,
hash::Hash,
inflation::Inflation,
native_token::{lamports_to_sol, sol_to_lamports, Sol},
pubkey::Pubkey,
rent::Rent,
shred_version::compute_shred_version,
signature::Signer,
signer::keypair::Keypair,
stake::{self, state::StakeState},
system_program,
timing::timestamp,
transaction::{
MessageHash, SanitizedTransaction, SimpleAddressLoader, VersionedTransaction,
},
},
solana_stake_program::stake_state::{self, PointValue},
solana_streamer::socket::SocketAddrSpace,
solana_vote_program::{
self,
vote_state::{self, VoteState},
@ -126,8 +101,10 @@ use {
},
};
mod args;
mod bigtable;
mod ledger_path;
mod ledger_utils;
mod output;
#[derive(PartialEq, Eq)]
@ -850,121 +827,6 @@ fn analyze_storage(database: &Database) {
analyze_column::<OptimisticSlots>(database, "OptimisticSlots");
}
/// Open blockstore with temporary primary access to allow necessary,
/// persistent changes to be made to the blockstore (such as creation of new
/// column family(s)). Then, continue opening with `original_access_type`
fn open_blockstore_with_temporary_primary_access(
ledger_path: &Path,
original_access_type: AccessType,
wal_recovery_mode: Option<BlockstoreRecoveryMode>,
) -> Result<Blockstore, BlockstoreError> {
// Open with Primary will allow any configuration that automatically
// updates to take effect
info!("Attempting to temporarily open blockstore with Primary access in order to update");
{
let _ = Blockstore::open_with_options(
ledger_path,
BlockstoreOptions {
access_type: AccessType::PrimaryForMaintenance,
recovery_mode: wal_recovery_mode.clone(),
enforce_ulimit_nofile: true,
..BlockstoreOptions::default()
},
)?;
}
// Now, attempt to open the blockstore with original AccessType
info!(
"Blockstore forced open succeeded, retrying with original access: {:?}",
original_access_type
);
Blockstore::open_with_options(
ledger_path,
BlockstoreOptions {
access_type: original_access_type,
recovery_mode: wal_recovery_mode,
enforce_ulimit_nofile: true,
..BlockstoreOptions::default()
},
)
}
fn get_shred_storage_type(ledger_path: &Path, message: &str) -> ShredStorageType {
// TODO: the following shred_storage_type inference must be updated once
// the rocksdb options can be constructed via load_options_file() as the
// value picked by passing None for `max_shred_storage_size` could affect
// the persisted rocksdb options file.
match ShredStorageType::from_ledger_path(ledger_path, None) {
Some(s) => s,
None => {
info!("{}", message);
ShredStorageType::RocksLevel
}
}
}
fn open_blockstore(
ledger_path: &Path,
access_type: AccessType,
wal_recovery_mode: Option<BlockstoreRecoveryMode>,
force_update_to_open: bool,
) -> Blockstore {
let shred_storage_type = get_shred_storage_type(
ledger_path,
&format!(
"Shred stroage type cannot be inferred for ledger at {ledger_path:?}, \
using default RocksLevel",
),
);
match Blockstore::open_with_options(
ledger_path,
BlockstoreOptions {
access_type: access_type.clone(),
recovery_mode: wal_recovery_mode.clone(),
enforce_ulimit_nofile: true,
column_options: LedgerColumnOptions {
shred_storage_type,
..LedgerColumnOptions::default()
},
},
) {
Ok(blockstore) => blockstore,
Err(BlockstoreError::RocksDb(err))
if (err
.to_string()
// Missing column family
.starts_with("Invalid argument: Column family not found:")
|| err
.to_string()
// Missing essential file, indicative of blockstore not existing
.starts_with("IO error: No such file or directory:"))
&& access_type == AccessType::Secondary =>
{
error!("Blockstore is incompatible with current software and requires updates");
if !force_update_to_open {
error!("Use --force-update-to-open to allow blockstore to update");
exit(1);
}
open_blockstore_with_temporary_primary_access(
ledger_path,
access_type,
wal_recovery_mode,
)
.unwrap_or_else(|err| {
error!(
"Failed to open blockstore (with --force-update-to-open) at {:?}: {:?}",
ledger_path, err
);
exit(1);
})
}
Err(err) => {
eprintln!("Failed to open blockstore at {ledger_path:?}: {err:?}");
exit(1);
}
}
}
fn raw_key_to_slot(key: &[u8], column_name: &str) -> Option<Slot> {
match column_name {
cf::SlotMeta::NAME => Some(cf::SlotMeta::slot(cf::SlotMeta::index(key))),
@ -1032,328 +894,6 @@ fn print_blockstore_file_metadata(
Ok(())
}
// This function is duplicated in validator/src/main.rs...
fn hardforks_of(matches: &ArgMatches<'_>, name: &str) -> Option<Vec<Slot>> {
if matches.is_present(name) {
Some(values_t_or_exit!(matches, name, Slot))
} else {
None
}
}
// Build an `AccountsDbConfig` from subcommand arguments. All of the arguments
// matched by this functional are either optional or have a default value.
// Thus, a subcommand need not support all of the arguments that are matched
// by this function.
fn get_accounts_db_config(ledger_path: &Path, arg_matches: &ArgMatches<'_>) -> AccountsDbConfig {
let accounts_index_bins = value_t!(arg_matches, "accounts_index_bins", usize).ok();
let accounts_index_index_limit_mb =
if let Some(limit) = value_t!(arg_matches, "accounts_index_memory_limit_mb", usize).ok() {
IndexLimitMb::Limit(limit)
} else if arg_matches.is_present("disable_accounts_disk_index") {
IndexLimitMb::InMemOnly
} else {
IndexLimitMb::Unspecified
};
let accounts_index_drives: Vec<PathBuf> = if arg_matches.is_present("accounts_index_path") {
values_t_or_exit!(arg_matches, "accounts_index_path", String)
.into_iter()
.map(PathBuf::from)
.collect()
} else {
vec![ledger_path.join("accounts_index.ledger-tool")]
};
let accounts_index_config = AccountsIndexConfig {
bins: accounts_index_bins,
index_limit_mb: accounts_index_index_limit_mb,
drives: Some(accounts_index_drives),
..AccountsIndexConfig::default()
};
let filler_accounts_config = FillerAccountsConfig {
count: value_t!(arg_matches, "accounts_filler_count", usize).unwrap_or(0),
size: value_t!(arg_matches, "accounts_filler_size", usize).unwrap_or(0),
};
AccountsDbConfig {
index: Some(accounts_index_config),
accounts_hash_cache_path: Some(ledger_path.join(AccountsDb::ACCOUNTS_HASH_CACHE_DIR)),
filler_accounts_config,
ancient_append_vec_offset: value_t!(arg_matches, "accounts_db_ancient_append_vecs", i64)
.ok(),
exhaustively_verify_refcounts: arg_matches.is_present("accounts_db_verify_refcounts"),
skip_initial_hash_calc: arg_matches.is_present("accounts_db_skip_initial_hash_calculation"),
..AccountsDbConfig::default()
}
}
fn load_bank_forks(
arg_matches: &ArgMatches,
genesis_config: &GenesisConfig,
blockstore: Arc<Blockstore>,
process_options: ProcessOptions,
snapshot_archive_path: Option<PathBuf>,
incremental_snapshot_archive_path: Option<PathBuf>,
) -> Result<(Arc<RwLock<BankForks>>, Option<StartingSnapshotHashes>), BlockstoreProcessorError> {
let bank_snapshots_dir = blockstore
.ledger_path()
.join(if blockstore.is_primary_access() {
"snapshot"
} else {
"snapshot.ledger-tool"
});
let mut starting_slot = 0; // default start check with genesis
let snapshot_config = if arg_matches.is_present("no_snapshot") {
None
} else {
let full_snapshot_archives_dir =
snapshot_archive_path.unwrap_or_else(|| blockstore.ledger_path().to_path_buf());
let incremental_snapshot_archives_dir =
incremental_snapshot_archive_path.unwrap_or_else(|| full_snapshot_archives_dir.clone());
if let Some(full_snapshot_slot) =
snapshot_utils::get_highest_full_snapshot_archive_slot(&full_snapshot_archives_dir)
{
let incremental_snapshot_slot =
snapshot_utils::get_highest_incremental_snapshot_archive_slot(
&incremental_snapshot_archives_dir,
full_snapshot_slot,
)
.unwrap_or_default();
starting_slot = std::cmp::max(full_snapshot_slot, incremental_snapshot_slot);
}
Some(SnapshotConfig {
full_snapshot_archives_dir,
incremental_snapshot_archives_dir,
bank_snapshots_dir: bank_snapshots_dir.clone(),
..SnapshotConfig::new_load_only()
})
};
match process_options.halt_at_slot {
// Skip the following checks for sentinel values of Some(0) and None.
// For Some(0), no slots will be be replayed after starting_slot.
// For None, all available children of starting_slot will be replayed.
None | Some(0) => {}
Some(halt_slot) => {
if halt_slot < starting_slot {
eprintln!(
"Unable to load bank forks at slot {halt_slot} because it is less than the starting slot {starting_slot}. \
The starting slot will be the latest snapshot slot, or genesis if --no-snapshot flag specified or no snapshots found."
);
exit(1);
}
// Check if we have the slot data necessary to replay from starting_slot to >= halt_slot.
if !blockstore.slot_range_connected(starting_slot, halt_slot) {
eprintln!(
"Unable to load bank forks at slot {halt_slot} due to disconnected blocks.",
);
exit(1);
}
}
}
let account_paths = if let Some(account_paths) = arg_matches.value_of("account_paths") {
// If this blockstore access is Primary, no other process (solana-validator) can hold
// Primary access. So, allow a custom accounts path without worry of wiping the accounts
// of solana-validator.
if !blockstore.is_primary_access() {
// Attempt to open the Blockstore in Primary access; if successful, no other process
// was holding Primary so allow things to proceed with custom accounts path. Release
// the Primary access instead of holding it to give priority to solana-validator over
// solana-ledger-tool should solana-validator start before we've finished.
info!(
"Checking if another process currently holding Primary access to {:?}",
blockstore.ledger_path()
);
if Blockstore::open_with_options(
blockstore.ledger_path(),
BlockstoreOptions {
access_type: AccessType::PrimaryForMaintenance,
..BlockstoreOptions::default()
},
)
.is_err()
{
// Couldn't get Primary access, error out to be defensive.
eprintln!("Error: custom accounts path is not supported under secondary access");
exit(1);
}
}
account_paths.split(',').map(PathBuf::from).collect()
} else if blockstore.is_primary_access() {
vec![blockstore.ledger_path().join("accounts")]
} else {
let non_primary_accounts_path = blockstore.ledger_path().join("accounts.ledger-tool");
info!(
"Default accounts path is switched aligning with Blockstore's secondary access: {:?}",
non_primary_accounts_path
);
vec![non_primary_accounts_path]
};
let (account_run_paths, account_snapshot_paths) =
create_all_accounts_run_and_snapshot_dirs(&account_paths).unwrap_or_else(|err| {
eprintln!("Error: {err:?}");
exit(1);
});
// From now on, use run/ paths in the same way as the previous account_paths.
let account_paths = account_run_paths;
info!("Cleaning contents of account paths: {:?}", account_paths);
let mut measure = Measure::start("clean_accounts_paths");
account_paths.iter().for_each(|path| {
if path.exists() {
move_and_async_delete_path(path);
}
});
measure.stop();
info!("done. {}", measure);
info!(
"Cleaning contents of account snapshot paths: {:?}",
account_snapshot_paths
);
if let Err(e) =
clean_orphaned_account_snapshot_dirs(&bank_snapshots_dir, &account_snapshot_paths)
{
eprintln!("Failed to clean orphaned account snapshot dirs. Error: {e:?}");
exit(1);
}
let mut accounts_update_notifier = Option::<AccountsUpdateNotifier>::default();
let mut transaction_notifier = Option::<TransactionNotifierLock>::default();
if arg_matches.is_present("geyser_plugin_config") {
let geyser_config_files = values_t_or_exit!(arg_matches, "geyser_plugin_config", String)
.into_iter()
.map(PathBuf::from)
.collect::<Vec<_>>();
let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded();
drop(confirmed_bank_sender);
let geyser_service =
GeyserPluginService::new(confirmed_bank_receiver, &geyser_config_files).unwrap_or_else(
|err| {
eprintln!("Failed to setup Geyser service: {err:?}");
exit(1);
},
);
accounts_update_notifier = geyser_service.get_accounts_update_notifier();
transaction_notifier = geyser_service.get_transaction_notifier();
}
let (bank_forks, leader_schedule_cache, starting_snapshot_hashes, ..) =
bank_forks_utils::load_bank_forks(
genesis_config,
blockstore.as_ref(),
account_paths,
None,
snapshot_config.as_ref(),
&process_options,
None,
accounts_update_notifier,
&Arc::default(),
);
let block_verification_method = value_t!(
arg_matches,
"block_verification_method",
BlockVerificationMethod
)
.unwrap_or_default();
info!(
"Using: block-verification-method: {}",
block_verification_method,
);
let exit = Arc::new(AtomicBool::new(false));
let node_id = Arc::new(Keypair::new());
let cluster_info = Arc::new(ClusterInfo::new(
ContactInfo::new_localhost(&node_id.pubkey(), timestamp()),
Arc::clone(&node_id),
SocketAddrSpace::Unspecified,
));
let (accounts_package_sender, accounts_package_receiver) = crossbeam_channel::unbounded();
let accounts_hash_verifier = AccountsHashVerifier::new(
accounts_package_sender.clone(),
accounts_package_receiver,
None,
exit.clone(),
cluster_info,
None,
false,
None,
SnapshotConfig::new_load_only(),
);
let (snapshot_request_sender, snapshot_request_receiver) = crossbeam_channel::unbounded();
let accounts_background_request_sender = AbsRequestSender::new(snapshot_request_sender.clone());
let snapshot_request_handler = SnapshotRequestHandler {
snapshot_config: SnapshotConfig::new_load_only(),
snapshot_request_sender,
snapshot_request_receiver,
accounts_package_sender,
};
let pruned_banks_receiver =
AccountsBackgroundService::setup_bank_drop_callback(bank_forks.clone());
let pruned_banks_request_handler = PrunedBanksRequestHandler {
pruned_banks_receiver,
};
let abs_request_handler = AbsRequestHandlers {
snapshot_request_handler,
pruned_banks_request_handler,
};
let accounts_background_service = AccountsBackgroundService::new(
bank_forks.clone(),
exit.clone(),
abs_request_handler,
process_options.accounts_db_test_hash_calculation,
None,
);
let (transaction_status_sender, transaction_status_service) = if transaction_notifier.is_some()
{
let (transaction_status_sender, transaction_status_receiver) = unbounded();
let transaction_status_service = TransactionStatusService::new(
transaction_status_receiver,
Arc::default(),
false,
transaction_notifier,
blockstore.clone(),
false,
&exit,
);
(
Some(TransactionStatusSender {
sender: transaction_status_sender,
}),
Some(transaction_status_service),
)
} else {
(None, None)
};
let result = blockstore_processor::process_blockstore_from_root(
blockstore.as_ref(),
&bank_forks,
&leader_schedule_cache,
&process_options,
transaction_status_sender.as_ref(),
None,
&accounts_background_request_sender,
)
.map(|_| (bank_forks, starting_snapshot_hashes));
exit.store(true, Ordering::Relaxed);
accounts_background_service.join().unwrap();
accounts_hash_verifier.join().unwrap();
if let Some(service) = transaction_status_service {
service.join().unwrap();
}
result
}
fn compute_slot_cost(blockstore: &Blockstore, slot: Slot) -> Result<(), String> {
if blockstore.is_dead(slot) {
return Err("Dead slot".to_string());
@ -1413,12 +953,6 @@ fn compute_slot_cost(blockstore: &Blockstore, slot: Slot) -> Result<(), String>
Ok(())
}
fn open_genesis_config_by(ledger_path: &Path, matches: &ArgMatches<'_>) -> GenesisConfig {
let max_genesis_archive_unpacked_size =
value_t_or_exit!(matches, "max_genesis_archive_unpacked_size", u64);
open_genesis_config(ledger_path, max_genesis_archive_unpacked_size)
}
/// Finds the accounts needed to replay slots `snapshot_slot` to `ending_slot`.
/// Removes all other accounts from accounts_db, and updates the accounts hash
/// and capitalization. This is used by the --minimize option in create-snapshot