Make `solana-ledger-tool` run AccountsBackgroundService (#26914)
Prior to this change, long running commands like `solana-ledger-tool verify` would OOM due to AccountsDb cleanup not happening. Co-authored-by: Michael Vines <mvines@gmail.com>
This commit is contained in:
parent
0f4b858224
commit
300666dce7
|
@ -116,9 +116,6 @@ use {
|
|||
const MAX_COMPLETED_DATA_SETS_IN_CHANNEL: usize = 100_000;
|
||||
const WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT: u64 = 80;
|
||||
|
||||
/// maximum drop bank signal queue length
|
||||
const MAX_DROP_BANK_SIGNAL_QUEUE_SIZE: usize = 10_000;
|
||||
|
||||
pub struct ValidatorConfig {
|
||||
pub halt_at_slot: Option<Slot>,
|
||||
pub expected_genesis_hash: Option<Hash>,
|
||||
|
@ -1468,22 +1465,8 @@ fn load_blockstore(
|
|||
// drop behavior can be safely synchronized with any other ongoing accounts activity like
|
||||
// cache flush, clean, shrink, as long as the same thread performing those activities also
|
||||
// is processing the dropped banks from the `pruned_banks_receiver` channel.
|
||||
|
||||
// There should only be one bank, the root bank in BankForks. Thus all banks added to
|
||||
// BankForks from now on will be descended from the root bank and thus will inherit
|
||||
// the bank drop callback.
|
||||
assert_eq!(bank_forks.read().unwrap().banks().len(), 1);
|
||||
let (pruned_banks_sender, pruned_banks_receiver) = bounded(MAX_DROP_BANK_SIGNAL_QUEUE_SIZE);
|
||||
{
|
||||
let root_bank = bank_forks.read().unwrap().root_bank();
|
||||
root_bank.set_callback(Some(Box::new(
|
||||
root_bank
|
||||
.rc
|
||||
.accounts
|
||||
.accounts_db
|
||||
.create_drop_bank_callback(pruned_banks_sender),
|
||||
)));
|
||||
}
|
||||
let pruned_banks_receiver =
|
||||
AccountsBackgroundService::setup_bank_drop_callback(bank_forks.clone());
|
||||
|
||||
{
|
||||
let hard_forks: Vec<_> = bank_forks
|
||||
|
|
|
@ -29,11 +29,14 @@ use {
|
|||
AccessType, BlockstoreOptions, BlockstoreRecoveryMode, LedgerColumnOptions,
|
||||
ShredStorageType,
|
||||
},
|
||||
blockstore_processor::{BlockstoreProcessorError, ProcessOptions},
|
||||
blockstore_processor::{self, BlockstoreProcessorError, ProcessOptions},
|
||||
shred::Shred,
|
||||
},
|
||||
solana_measure::{measure, measure::Measure},
|
||||
solana_runtime::{
|
||||
accounts_background_service::{
|
||||
AbsRequestHandler, AbsRequestSender, AccountsBackgroundService,
|
||||
},
|
||||
accounts_db::{AccountsDbConfig, FillerAccountsConfig},
|
||||
accounts_index::{AccountsIndexConfig, IndexLimitMb, ScanConfig},
|
||||
bank::{Bank, RewardCalculationEvent},
|
||||
|
@ -923,18 +926,49 @@ fn load_bank_forks(
|
|||
vec![non_primary_accounts_path]
|
||||
};
|
||||
|
||||
bank_forks_utils::load(
|
||||
genesis_config,
|
||||
let (bank_forks, leader_schedule_cache, starting_snapshot_hashes, ..) =
|
||||
bank_forks_utils::load_bank_forks(
|
||||
genesis_config,
|
||||
blockstore,
|
||||
account_paths,
|
||||
None,
|
||||
snapshot_config.as_ref(),
|
||||
&process_options,
|
||||
None,
|
||||
None,
|
||||
);
|
||||
|
||||
let pruned_banks_receiver =
|
||||
AccountsBackgroundService::setup_bank_drop_callback(bank_forks.clone());
|
||||
let abs_request_handler = AbsRequestHandler {
|
||||
snapshot_request_handler: None,
|
||||
pruned_banks_receiver,
|
||||
};
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let accounts_background_service = AccountsBackgroundService::new(
|
||||
bank_forks.clone(),
|
||||
&exit,
|
||||
abs_request_handler,
|
||||
process_options.accounts_db_caching_enabled,
|
||||
process_options.accounts_db_test_hash_calculation,
|
||||
None,
|
||||
);
|
||||
|
||||
let result = blockstore_processor::process_blockstore_from_root(
|
||||
blockstore,
|
||||
account_paths,
|
||||
None,
|
||||
snapshot_config.as_ref(),
|
||||
process_options,
|
||||
None,
|
||||
&bank_forks,
|
||||
&leader_schedule_cache,
|
||||
&process_options,
|
||||
None,
|
||||
None,
|
||||
&AbsRequestSender::default(),
|
||||
)
|
||||
.map(|(bank_forks, .., starting_snapshot_hashes)| (bank_forks, starting_snapshot_hashes))
|
||||
.map(|_| (bank_forks, starting_snapshot_hashes));
|
||||
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
accounts_background_service.join().unwrap();
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
fn compute_slot_cost(blockstore: &Blockstore, slot: Slot) -> Result<(), String> {
|
||||
|
|
|
@ -9,6 +9,7 @@ use {
|
|||
},
|
||||
log::*,
|
||||
solana_runtime::{
|
||||
accounts_background_service::AbsRequestSender,
|
||||
accounts_update_notifier_interface::AccountsUpdateNotifier,
|
||||
bank_forks::BankForks,
|
||||
snapshot_archive_info::SnapshotArchiveInfoGetter,
|
||||
|
@ -68,7 +69,7 @@ pub fn load(
|
|||
&process_options,
|
||||
transaction_status_sender,
|
||||
cache_block_meta_sender,
|
||||
&solana_runtime::accounts_background_service::AbsRequestSender::default(),
|
||||
&AbsRequestSender::default(),
|
||||
)
|
||||
.map(|_| (bank_forks, leader_schedule_cache, starting_snapshot_hashes))
|
||||
}
|
||||
|
|
|
@ -53,6 +53,8 @@ pub type DroppedSlotsReceiver = Receiver<(Slot, BankId)>;
|
|||
|
||||
/// interval to report bank_drop queue events: 60s
|
||||
const BANK_DROP_SIGNAL_CHANNEL_REPORT_INTERVAL: u64 = 60_000;
|
||||
/// maximum drop bank signal queue length
|
||||
const MAX_DROP_BANK_SIGNAL_QUEUE_SIZE: usize = 10_000;
|
||||
|
||||
/// Bank drop signal queue events
|
||||
#[allow(dead_code)]
|
||||
|
@ -573,6 +575,28 @@ impl AccountsBackgroundService {
|
|||
Self { t_background }
|
||||
}
|
||||
|
||||
/// Should be called immediately after bank_fork_utils::load_bank_forks(), and as such, there
|
||||
/// should only be one bank, the root bank, in `bank_forks`
|
||||
/// All banks added to `bank_forks` will be descended from the root bank, and thus will inherit
|
||||
/// the bank drop callback.
|
||||
pub fn setup_bank_drop_callback(bank_forks: Arc<RwLock<BankForks>>) -> DroppedSlotsReceiver {
|
||||
assert_eq!(bank_forks.read().unwrap().banks().len(), 1);
|
||||
|
||||
let (pruned_banks_sender, pruned_banks_receiver) =
|
||||
crossbeam_channel::bounded(MAX_DROP_BANK_SIGNAL_QUEUE_SIZE);
|
||||
{
|
||||
let root_bank = bank_forks.read().unwrap().root_bank();
|
||||
root_bank.set_callback(Some(Box::new(
|
||||
root_bank
|
||||
.rc
|
||||
.accounts
|
||||
.accounts_db
|
||||
.create_drop_bank_callback(pruned_banks_sender),
|
||||
)));
|
||||
}
|
||||
pruned_banks_receiver
|
||||
}
|
||||
|
||||
pub fn join(self) -> thread::Result<()> {
|
||||
self.t_background.join()
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue