Removes holding storages in AccountsHashVerifier for fastboot (#120)
This commit is contained in:
parent
27e51b3196
commit
096a1f4e5c
|
@ -37,7 +37,6 @@ pub mod secondary_index;
|
|||
pub mod shared_buffer_reader;
|
||||
pub mod sorted_storages;
|
||||
pub mod stake_rewards;
|
||||
pub mod starting_snapshot_storages;
|
||||
pub mod storable_accounts;
|
||||
pub mod tiered_storage;
|
||||
pub mod utils;
|
||||
|
|
|
@ -1,19 +0,0 @@
|
|||
use {crate::accounts_db::AccountStorageEntry, std::sync::Arc};
|
||||
|
||||
/// Snapshot storages that the node loaded from
|
||||
///
|
||||
/// This is used to support fastboot. Since fastboot reuses existing storages, we must carefully
|
||||
/// handle the storages used to load at startup. If we do not handle these storages properly,
|
||||
/// restarting from the same local state (i.e. bank snapshot) may fail.
|
||||
#[derive(Debug)]
|
||||
pub enum StartingSnapshotStorages {
|
||||
/// Starting from genesis has no storages yet
|
||||
Genesis,
|
||||
/// Starting from a snapshot archive always extracts the storages from the archive, so no
|
||||
/// special handling is necessary to preserve them.
|
||||
Archive,
|
||||
/// Starting from local state must preserve the loaded storages. These storages must *not* be
|
||||
/// recycled or removed prior to taking the next snapshot, otherwise restarting from the same
|
||||
/// bank snapshot may fail.
|
||||
Fastboot(Vec<Arc<AccountStorageEntry>>),
|
||||
}
|
|
@ -9,7 +9,6 @@ use {
|
|||
IncrementalAccountsHash,
|
||||
},
|
||||
sorted_storages::SortedStorages,
|
||||
starting_snapshot_storages::StartingSnapshotStorages,
|
||||
},
|
||||
solana_measure::measure_us,
|
||||
solana_runtime::{
|
||||
|
@ -43,7 +42,6 @@ impl AccountsHashVerifier {
|
|||
accounts_package_sender: Sender<AccountsPackage>,
|
||||
accounts_package_receiver: Receiver<AccountsPackage>,
|
||||
snapshot_package_sender: Option<Sender<SnapshotPackage>>,
|
||||
starting_snapshot_storages: StartingSnapshotStorages,
|
||||
exit: Arc<AtomicBool>,
|
||||
snapshot_config: SnapshotConfig,
|
||||
) -> Self {
|
||||
|
@ -53,14 +51,6 @@ impl AccountsHashVerifier {
|
|||
.name("solAcctHashVer".to_string())
|
||||
.spawn(move || {
|
||||
info!("AccountsHashVerifier has started");
|
||||
// To support fastboot, we must ensure the storages used in the latest POST snapshot are
|
||||
// not recycled nor removed early. Hold an Arc of their AppendVecs to prevent them from
|
||||
// expiring.
|
||||
let mut fastboot_storages = match starting_snapshot_storages {
|
||||
StartingSnapshotStorages::Genesis => None,
|
||||
StartingSnapshotStorages::Archive => None,
|
||||
StartingSnapshotStorages::Fastboot(storages) => Some(storages),
|
||||
};
|
||||
loop {
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
break;
|
||||
|
@ -81,14 +71,6 @@ impl AccountsHashVerifier {
|
|||
info!("handling accounts package: {accounts_package:?}");
|
||||
let enqueued_time = accounts_package.enqueued.elapsed();
|
||||
|
||||
// If this accounts package is for a snapshot, then clone the storages to
|
||||
// save for fastboot.
|
||||
let snapshot_storages_for_fastboot = accounts_package
|
||||
.snapshot_info
|
||||
.is_some()
|
||||
.then(|| accounts_package.snapshot_storages.clone());
|
||||
|
||||
let slot = accounts_package.slot;
|
||||
let (_, handling_time_us) = measure_us!(Self::process_accounts_package(
|
||||
accounts_package,
|
||||
snapshot_package_sender.as_ref(),
|
||||
|
@ -96,25 +78,6 @@ impl AccountsHashVerifier {
|
|||
&exit,
|
||||
));
|
||||
|
||||
if let Some(snapshot_storages_for_fastboot) = snapshot_storages_for_fastboot {
|
||||
// Get the number of storages that are being kept alive for fastboot.
|
||||
// Looking at the storage Arc's strong reference count, we know that one
|
||||
// ref is for fastboot, and one ref is for snapshot packaging. If there
|
||||
// are no others, then the storage will be kept alive because of fastboot.
|
||||
let num_storages_kept_alive = snapshot_storages_for_fastboot
|
||||
.iter()
|
||||
.filter(|storage| Arc::strong_count(storage) == 2)
|
||||
.count();
|
||||
let num_storages_total = snapshot_storages_for_fastboot.len();
|
||||
fastboot_storages = Some(snapshot_storages_for_fastboot);
|
||||
datapoint_info!(
|
||||
"fastboot",
|
||||
("slot", slot, i64),
|
||||
("num_storages_total", num_storages_total, i64),
|
||||
("num_storages_kept_alive", num_storages_kept_alive, i64),
|
||||
);
|
||||
}
|
||||
|
||||
datapoint_info!(
|
||||
"accounts_hash_verifier",
|
||||
(
|
||||
|
@ -132,13 +95,6 @@ impl AccountsHashVerifier {
|
|||
);
|
||||
}
|
||||
info!("AccountsHashVerifier has stopped");
|
||||
debug!(
|
||||
"Number of storages kept alive for fastboot: {}",
|
||||
fastboot_storages
|
||||
.as_ref()
|
||||
.map(|storages| storages.len())
|
||||
.unwrap_or(0)
|
||||
);
|
||||
})
|
||||
.unwrap();
|
||||
Self {
|
||||
|
|
|
@ -35,7 +35,6 @@ use {
|
|||
accounts_index::AccountSecondaryIndexes,
|
||||
accounts_update_notifier_interface::AccountsUpdateNotifier,
|
||||
hardened_unpack::{open_genesis_config, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE},
|
||||
starting_snapshot_storages::StartingSnapshotStorages,
|
||||
utils::{move_and_async_delete_path, move_and_async_delete_path_contents},
|
||||
},
|
||||
solana_client::connection_cache::{ConnectionCache, Protocol},
|
||||
|
@ -691,7 +690,6 @@ impl Validator {
|
|||
completed_slots_receiver,
|
||||
leader_schedule_cache,
|
||||
starting_snapshot_hashes,
|
||||
starting_snapshot_storages,
|
||||
TransactionHistoryServices {
|
||||
transaction_status_sender,
|
||||
transaction_status_service,
|
||||
|
@ -781,7 +779,6 @@ impl Validator {
|
|||
accounts_package_sender.clone(),
|
||||
accounts_package_receiver,
|
||||
snapshot_package_sender,
|
||||
starting_snapshot_storages,
|
||||
exit.clone(),
|
||||
config.snapshot_config.clone(),
|
||||
);
|
||||
|
@ -1770,7 +1767,6 @@ fn load_blockstore(
|
|||
CompletedSlotsReceiver,
|
||||
LeaderScheduleCache,
|
||||
Option<StartingSnapshotHashes>,
|
||||
StartingSnapshotStorages,
|
||||
TransactionHistoryServices,
|
||||
blockstore_processor::ProcessOptions,
|
||||
BlockstoreRootScan,
|
||||
|
@ -1860,27 +1856,23 @@ fn load_blockstore(
|
|||
let entry_notifier_service = entry_notifier
|
||||
.map(|entry_notifier| EntryNotifierService::new(entry_notifier, exit.clone()));
|
||||
|
||||
let (
|
||||
bank_forks,
|
||||
mut leader_schedule_cache,
|
||||
starting_snapshot_hashes,
|
||||
starting_snapshot_storages,
|
||||
) = bank_forks_utils::load_bank_forks(
|
||||
&genesis_config,
|
||||
&blockstore,
|
||||
config.account_paths.clone(),
|
||||
Some(&config.snapshot_config),
|
||||
&process_options,
|
||||
transaction_history_services
|
||||
.cache_block_meta_sender
|
||||
.as_ref(),
|
||||
entry_notifier_service
|
||||
.as_ref()
|
||||
.map(|service| service.sender()),
|
||||
accounts_update_notifier,
|
||||
exit,
|
||||
)
|
||||
.map_err(|err| err.to_string())?;
|
||||
let (bank_forks, mut leader_schedule_cache, starting_snapshot_hashes) =
|
||||
bank_forks_utils::load_bank_forks(
|
||||
&genesis_config,
|
||||
&blockstore,
|
||||
config.account_paths.clone(),
|
||||
Some(&config.snapshot_config),
|
||||
&process_options,
|
||||
transaction_history_services
|
||||
.cache_block_meta_sender
|
||||
.as_ref(),
|
||||
entry_notifier_service
|
||||
.as_ref()
|
||||
.map(|service| service.sender()),
|
||||
accounts_update_notifier,
|
||||
exit,
|
||||
)
|
||||
.map_err(|err| err.to_string())?;
|
||||
|
||||
// Before replay starts, set the callbacks in each of the banks in BankForks so that
|
||||
// all dropped banks come through the `pruned_banks_receiver` channel. This way all bank
|
||||
|
@ -1906,7 +1898,6 @@ fn load_blockstore(
|
|||
completed_slots_receiver,
|
||||
leader_schedule_cache,
|
||||
starting_snapshot_hashes,
|
||||
starting_snapshot_storages,
|
||||
transaction_history_services,
|
||||
process_options,
|
||||
blockstore_root_scan,
|
||||
|
|
|
@ -9,7 +9,6 @@ use {
|
|||
accounts_hash::CalcAccountsHashConfig,
|
||||
accounts_index::AccountSecondaryIndexes,
|
||||
epoch_accounts_hash::EpochAccountsHash,
|
||||
starting_snapshot_storages::StartingSnapshotStorages,
|
||||
},
|
||||
solana_core::{
|
||||
accounts_hash_verifier::AccountsHashVerifier,
|
||||
|
@ -197,7 +196,6 @@ impl BackgroundServices {
|
|||
accounts_package_sender.clone(),
|
||||
accounts_package_receiver,
|
||||
Some(snapshot_package_sender),
|
||||
StartingSnapshotStorages::Genesis,
|
||||
exit.clone(),
|
||||
snapshot_config.clone(),
|
||||
);
|
||||
|
|
|
@ -11,7 +11,6 @@ use {
|
|||
accounts_hash::AccountsHash,
|
||||
accounts_index::AccountSecondaryIndexes,
|
||||
epoch_accounts_hash::EpochAccountsHash,
|
||||
starting_snapshot_storages::StartingSnapshotStorages,
|
||||
},
|
||||
solana_core::{
|
||||
accounts_hash_verifier::AccountsHashVerifier,
|
||||
|
@ -1044,7 +1043,6 @@ fn test_snapshots_with_background_services(
|
|||
accounts_package_sender,
|
||||
accounts_package_receiver,
|
||||
Some(snapshot_package_sender),
|
||||
StartingSnapshotStorages::Genesis,
|
||||
exit.clone(),
|
||||
snapshot_test_config.snapshot_config.clone(),
|
||||
);
|
||||
|
|
|
@ -268,24 +268,19 @@ pub fn load_and_process_ledger(
|
|||
};
|
||||
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let (
|
||||
bank_forks,
|
||||
leader_schedule_cache,
|
||||
starting_snapshot_hashes,
|
||||
starting_snapshot_storages,
|
||||
..,
|
||||
) = bank_forks_utils::load_bank_forks(
|
||||
genesis_config,
|
||||
blockstore.as_ref(),
|
||||
account_paths,
|
||||
snapshot_config.as_ref(),
|
||||
&process_options,
|
||||
None,
|
||||
None, // Maybe support this later, though
|
||||
accounts_update_notifier,
|
||||
exit.clone(),
|
||||
)
|
||||
.map_err(LoadAndProcessLedgerError::LoadBankForks)?;
|
||||
let (bank_forks, leader_schedule_cache, starting_snapshot_hashes, ..) =
|
||||
bank_forks_utils::load_bank_forks(
|
||||
genesis_config,
|
||||
blockstore.as_ref(),
|
||||
account_paths,
|
||||
snapshot_config.as_ref(),
|
||||
&process_options,
|
||||
None,
|
||||
None, // Maybe support this later, though
|
||||
accounts_update_notifier,
|
||||
exit.clone(),
|
||||
)
|
||||
.map_err(LoadAndProcessLedgerError::LoadBankForks)?;
|
||||
let block_verification_method = value_t!(
|
||||
arg_matches,
|
||||
"block_verification_method",
|
||||
|
@ -330,7 +325,6 @@ pub fn load_and_process_ledger(
|
|||
accounts_package_sender.clone(),
|
||||
accounts_package_receiver,
|
||||
None,
|
||||
starting_snapshot_storages,
|
||||
exit.clone(),
|
||||
SnapshotConfig::new_load_only(),
|
||||
);
|
||||
|
|
|
@ -10,10 +10,7 @@ use {
|
|||
use_snapshot_archives_at_startup::{self, UseSnapshotArchivesAtStartup},
|
||||
},
|
||||
log::*,
|
||||
solana_accounts_db::{
|
||||
accounts_update_notifier_interface::AccountsUpdateNotifier,
|
||||
starting_snapshot_storages::StartingSnapshotStorages,
|
||||
},
|
||||
solana_accounts_db::accounts_update_notifier_interface::AccountsUpdateNotifier,
|
||||
solana_runtime::{
|
||||
accounts_background_service::AbsRequestSender,
|
||||
bank_forks::BankForks,
|
||||
|
@ -70,7 +67,6 @@ pub type LoadResult = result::Result<
|
|||
Arc<RwLock<BankForks>>,
|
||||
LeaderScheduleCache,
|
||||
Option<StartingSnapshotHashes>,
|
||||
StartingSnapshotStorages,
|
||||
),
|
||||
BankForksUtilsError,
|
||||
>;
|
||||
|
@ -92,13 +88,7 @@ pub fn load(
|
|||
accounts_update_notifier: Option<AccountsUpdateNotifier>,
|
||||
exit: Arc<AtomicBool>,
|
||||
) -> LoadResult {
|
||||
let (
|
||||
bank_forks,
|
||||
leader_schedule_cache,
|
||||
starting_snapshot_hashes,
|
||||
starting_snapshot_storages,
|
||||
..,
|
||||
) = load_bank_forks(
|
||||
let (bank_forks, leader_schedule_cache, starting_snapshot_hashes, ..) = load_bank_forks(
|
||||
genesis_config,
|
||||
blockstore,
|
||||
account_paths,
|
||||
|
@ -121,12 +111,7 @@ pub fn load(
|
|||
)
|
||||
.map_err(BankForksUtilsError::ProcessBlockstoreFromRoot)?;
|
||||
|
||||
Ok((
|
||||
bank_forks,
|
||||
leader_schedule_cache,
|
||||
starting_snapshot_hashes,
|
||||
starting_snapshot_storages,
|
||||
))
|
||||
Ok((bank_forks, leader_schedule_cache, starting_snapshot_hashes))
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
|
@ -176,7 +161,7 @@ pub fn load_bank_forks(
|
|||
))
|
||||
}
|
||||
|
||||
let (bank_forks, starting_snapshot_hashes, starting_snapshot_storages) =
|
||||
let (bank_forks, starting_snapshot_hashes) =
|
||||
if let Some((full_snapshot_archive_info, incremental_snapshot_archive_info)) =
|
||||
get_snapshots_to_load(snapshot_config)
|
||||
{
|
||||
|
@ -188,22 +173,17 @@ pub fn load_bank_forks(
|
|||
);
|
||||
std::fs::create_dir_all(&snapshot_config.bank_snapshots_dir)
|
||||
.expect("create bank snapshots dir");
|
||||
let (bank_forks, starting_snapshot_hashes, starting_snapshot_storages) =
|
||||
bank_forks_from_snapshot(
|
||||
full_snapshot_archive_info,
|
||||
incremental_snapshot_archive_info,
|
||||
genesis_config,
|
||||
account_paths,
|
||||
snapshot_config,
|
||||
process_options,
|
||||
accounts_update_notifier,
|
||||
exit,
|
||||
)?;
|
||||
(
|
||||
bank_forks,
|
||||
Some(starting_snapshot_hashes),
|
||||
starting_snapshot_storages,
|
||||
)
|
||||
let (bank_forks, starting_snapshot_hashes) = bank_forks_from_snapshot(
|
||||
full_snapshot_archive_info,
|
||||
incremental_snapshot_archive_info,
|
||||
genesis_config,
|
||||
account_paths,
|
||||
snapshot_config,
|
||||
process_options,
|
||||
accounts_update_notifier,
|
||||
exit,
|
||||
)?;
|
||||
(bank_forks, Some(starting_snapshot_hashes))
|
||||
} else {
|
||||
info!("Processing ledger from genesis");
|
||||
let bank_forks = blockstore_processor::process_blockstore_for_bank_0(
|
||||
|
@ -222,7 +202,7 @@ pub fn load_bank_forks(
|
|||
.root_bank()
|
||||
.set_startup_verification_complete();
|
||||
|
||||
(bank_forks, None, StartingSnapshotStorages::Genesis)
|
||||
(bank_forks, None)
|
||||
};
|
||||
|
||||
let mut leader_schedule_cache =
|
||||
|
@ -238,12 +218,7 @@ pub fn load_bank_forks(
|
|||
.for_each(|hard_fork_slot| root_bank.register_hard_fork(*hard_fork_slot));
|
||||
}
|
||||
|
||||
Ok((
|
||||
bank_forks,
|
||||
leader_schedule_cache,
|
||||
starting_snapshot_hashes,
|
||||
starting_snapshot_storages,
|
||||
))
|
||||
Ok((bank_forks, leader_schedule_cache, starting_snapshot_hashes))
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
|
@ -256,14 +231,7 @@ fn bank_forks_from_snapshot(
|
|||
process_options: &ProcessOptions,
|
||||
accounts_update_notifier: Option<AccountsUpdateNotifier>,
|
||||
exit: Arc<AtomicBool>,
|
||||
) -> Result<
|
||||
(
|
||||
Arc<RwLock<BankForks>>,
|
||||
StartingSnapshotHashes,
|
||||
StartingSnapshotStorages,
|
||||
),
|
||||
BankForksUtilsError,
|
||||
> {
|
||||
) -> Result<(Arc<RwLock<BankForks>>, StartingSnapshotHashes), BankForksUtilsError> {
|
||||
// Fail hard here if snapshot fails to load, don't silently continue
|
||||
if account_paths.is_empty() {
|
||||
return Err(BankForksUtilsError::AccountPathsNotPresent);
|
||||
|
@ -289,7 +257,7 @@ fn bank_forks_from_snapshot(
|
|||
.unwrap_or(true),
|
||||
};
|
||||
|
||||
let (bank, starting_snapshot_storages) = if will_startup_from_snapshot_archives {
|
||||
let bank = if will_startup_from_snapshot_archives {
|
||||
// Given that we are going to boot from an archive, the append vecs held in the snapshot dirs for fast-boot should
|
||||
// be released. They will be released by the account_background_service anyway. But in the case of the account_paths
|
||||
// using memory-mounted file system, they are not released early enough to give space for the new append-vecs from
|
||||
|
@ -324,7 +292,7 @@ fn bank_forks_from_snapshot(
|
|||
.map(|archive| archive.path().display().to_string())
|
||||
.unwrap_or("none".to_string()),
|
||||
})?;
|
||||
(bank, StartingSnapshotStorages::Archive)
|
||||
bank
|
||||
} else {
|
||||
let bank_snapshot =
|
||||
latest_bank_snapshot.ok_or_else(|| BankForksUtilsError::NoBankSnapshotDirectory {
|
||||
|
@ -378,8 +346,7 @@ fn bank_forks_from_snapshot(
|
|||
// snapshot archive next time, which is safe.
|
||||
snapshot_utils::purge_all_bank_snapshots(&snapshot_config.bank_snapshots_dir);
|
||||
|
||||
let storages = bank.get_snapshot_storages(None);
|
||||
(bank, StartingSnapshotStorages::Fastboot(storages))
|
||||
bank
|
||||
};
|
||||
|
||||
let full_snapshot_hash = FullSnapshotHash((
|
||||
|
@ -398,9 +365,5 @@ fn bank_forks_from_snapshot(
|
|||
incremental: incremental_snapshot_hash,
|
||||
};
|
||||
|
||||
Ok((
|
||||
BankForks::new_rw_arc(bank),
|
||||
starting_snapshot_hashes,
|
||||
starting_snapshot_storages,
|
||||
))
|
||||
Ok((BankForks::new_rw_arc(bank), starting_snapshot_hashes))
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue