Move pruned_banks_receiver into PrunedBanksRequestHandler (#27445)
This commit is contained in:
parent
60f3dc5f72
commit
3c7cd62030
|
@ -67,8 +67,8 @@ use {
|
|||
},
|
||||
solana_runtime::{
|
||||
accounts_background_service::{
|
||||
AbsRequestHandler, AbsRequestSender, AccountsBackgroundService, DroppedSlotsReceiver,
|
||||
SnapshotRequestHandler,
|
||||
AbsRequestHandlers, AbsRequestSender, AccountsBackgroundService, DroppedSlotsReceiver,
|
||||
PrunedBanksRequestHandler, SnapshotRequestHandler,
|
||||
},
|
||||
accounts_db::{AccountShrinkThreshold, AccountsDbConfig},
|
||||
accounts_index::AccountSecondaryIndexes,
|
||||
|
@ -640,13 +640,16 @@ impl Validator {
|
|||
config.snapshot_config.clone(),
|
||||
);
|
||||
|
||||
let pruned_banks_request_handler = PrunedBanksRequestHandler {
|
||||
pruned_banks_receiver,
|
||||
};
|
||||
let last_full_snapshot_slot = starting_snapshot_hashes.map(|x| x.full.hash.0);
|
||||
let accounts_background_service = AccountsBackgroundService::new(
|
||||
bank_forks.clone(),
|
||||
&exit,
|
||||
AbsRequestHandler {
|
||||
AbsRequestHandlers {
|
||||
snapshot_request_handler,
|
||||
pruned_banks_receiver,
|
||||
pruned_banks_request_handler,
|
||||
},
|
||||
config.accounts_db_caching_enabled,
|
||||
config.accounts_db_test_hash_calculation,
|
||||
|
|
|
@ -13,7 +13,8 @@ use {
|
|||
solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo},
|
||||
solana_runtime::{
|
||||
accounts_background_service::{
|
||||
AbsRequestHandler, AbsRequestSender, AccountsBackgroundService, SnapshotRequestHandler,
|
||||
AbsRequestHandlers, AbsRequestSender, AccountsBackgroundService,
|
||||
PrunedBanksRequestHandler, SnapshotRequestHandler,
|
||||
},
|
||||
accounts_db::{self, ACCOUNTS_DB_CONFIG_FOR_TESTING},
|
||||
accounts_index::AccountSecondaryIndexes,
|
||||
|
@ -929,10 +930,13 @@ fn test_snapshots_with_background_services(
|
|||
snapshot_request_receiver,
|
||||
pending_accounts_package: Arc::clone(&pending_accounts_package),
|
||||
});
|
||||
let abs_request_handler = AbsRequestHandler {
|
||||
snapshot_request_handler,
|
||||
let pruned_banks_request_handler = PrunedBanksRequestHandler {
|
||||
pruned_banks_receiver,
|
||||
};
|
||||
let abs_request_handler = AbsRequestHandlers {
|
||||
snapshot_request_handler,
|
||||
pruned_banks_request_handler,
|
||||
};
|
||||
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let snapshot_packager_service = SnapshotPackagerService::new(
|
||||
|
|
|
@ -37,7 +37,8 @@ use {
|
|||
solana_measure::{measure, measure::Measure},
|
||||
solana_runtime::{
|
||||
accounts_background_service::{
|
||||
AbsRequestHandler, AbsRequestSender, AccountsBackgroundService,
|
||||
AbsRequestHandlers, AbsRequestSender, AccountsBackgroundService,
|
||||
PrunedBanksRequestHandler,
|
||||
},
|
||||
accounts_db::{AccountsDbConfig, FillerAccountsConfig},
|
||||
accounts_index::{AccountsIndexConfig, IndexLimitMb, ScanConfig},
|
||||
|
@ -1030,10 +1031,13 @@ fn load_bank_forks(
|
|||
|
||||
let pruned_banks_receiver =
|
||||
AccountsBackgroundService::setup_bank_drop_callback(bank_forks.clone());
|
||||
let abs_request_handler = AbsRequestHandler {
|
||||
snapshot_request_handler: None,
|
||||
let pruned_banks_request_handler = PrunedBanksRequestHandler {
|
||||
pruned_banks_receiver,
|
||||
};
|
||||
let abs_request_handler = AbsRequestHandlers {
|
||||
snapshot_request_handler: None,
|
||||
pruned_banks_request_handler,
|
||||
};
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let accounts_background_service = AccountsBackgroundService::new(
|
||||
bank_forks.clone(),
|
||||
|
|
|
@ -399,12 +399,55 @@ impl AbsRequestSender {
|
|||
}
|
||||
}
|
||||
|
||||
pub struct AbsRequestHandler {
|
||||
pub snapshot_request_handler: Option<SnapshotRequestHandler>,
|
||||
#[derive(Debug)]
|
||||
pub struct PrunedBanksRequestHandler {
|
||||
pub pruned_banks_receiver: DroppedSlotsReceiver,
|
||||
}
|
||||
|
||||
impl AbsRequestHandler {
|
||||
impl PrunedBanksRequestHandler {
|
||||
pub fn handle_request(&self, bank: &Bank, is_serialized_with_abs: bool) -> usize {
|
||||
let mut count = 0;
|
||||
for (pruned_slot, pruned_bank_id) in self.pruned_banks_receiver.try_iter() {
|
||||
count += 1;
|
||||
bank.rc.accounts.accounts_db.purge_slot(
|
||||
pruned_slot,
|
||||
pruned_bank_id,
|
||||
is_serialized_with_abs,
|
||||
);
|
||||
}
|
||||
|
||||
count
|
||||
}
|
||||
|
||||
fn remove_dead_slots(
|
||||
&self,
|
||||
bank: &Bank,
|
||||
removed_slots_count: &mut usize,
|
||||
total_remove_slots_time: &mut u64,
|
||||
) {
|
||||
let mut remove_slots_time = Measure::start("remove_slots_time");
|
||||
*removed_slots_count += self.handle_request(bank, true);
|
||||
remove_slots_time.stop();
|
||||
*total_remove_slots_time += remove_slots_time.as_us();
|
||||
|
||||
if *removed_slots_count >= 100 {
|
||||
datapoint_info!(
|
||||
"remove_slots_timing",
|
||||
("remove_slots_time", *total_remove_slots_time, i64),
|
||||
("removed_slots_count", *removed_slots_count, i64),
|
||||
);
|
||||
*total_remove_slots_time = 0;
|
||||
*removed_slots_count = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct AbsRequestHandlers {
|
||||
pub snapshot_request_handler: Option<SnapshotRequestHandler>,
|
||||
pub pruned_banks_request_handler: PrunedBanksRequestHandler,
|
||||
}
|
||||
|
||||
impl AbsRequestHandlers {
|
||||
// Returns the latest requested snapshot block height, if one exists
|
||||
pub fn handle_snapshot_requests(
|
||||
&self,
|
||||
|
@ -424,20 +467,6 @@ impl AbsRequestHandler {
|
|||
)
|
||||
})
|
||||
}
|
||||
|
||||
pub fn handle_pruned_banks(&self, bank: &Bank, is_serialized_with_abs: bool) -> usize {
|
||||
let mut count = 0;
|
||||
for (pruned_slot, pruned_bank_id) in self.pruned_banks_receiver.try_iter() {
|
||||
count += 1;
|
||||
bank.rc.accounts.accounts_db.purge_slot(
|
||||
pruned_slot,
|
||||
pruned_bank_id,
|
||||
is_serialized_with_abs,
|
||||
);
|
||||
}
|
||||
|
||||
count
|
||||
}
|
||||
}
|
||||
|
||||
pub struct AccountsBackgroundService {
|
||||
|
@ -448,7 +477,7 @@ impl AccountsBackgroundService {
|
|||
pub fn new(
|
||||
bank_forks: Arc<RwLock<BankForks>>,
|
||||
exit: &Arc<AtomicBool>,
|
||||
request_handler: AbsRequestHandler,
|
||||
request_handlers: AbsRequestHandlers,
|
||||
accounts_db_caching_enabled: bool,
|
||||
test_hash_calculation: bool,
|
||||
mut last_full_snapshot_slot: Option<Slot>,
|
||||
|
@ -475,12 +504,13 @@ impl AccountsBackgroundService {
|
|||
let bank = bank_forks.read().unwrap().root_bank().clone();
|
||||
|
||||
// Purge accounts of any dead slots
|
||||
Self::remove_dead_slots(
|
||||
&bank,
|
||||
&request_handler,
|
||||
&mut removed_slots_count,
|
||||
&mut total_remove_slots_time,
|
||||
);
|
||||
request_handlers
|
||||
.pruned_banks_request_handler
|
||||
.remove_dead_slots(
|
||||
&bank,
|
||||
&mut removed_slots_count,
|
||||
&mut total_remove_slots_time,
|
||||
);
|
||||
|
||||
Self::expire_old_recycle_stores(&bank, &mut last_expiration_check_time);
|
||||
|
||||
|
@ -507,7 +537,7 @@ impl AccountsBackgroundService {
|
|||
// request for `N` to the snapshot request channel before setting a root `R > N`, and
|
||||
// snapshot_request_handler.handle_requests() will always look for the latest
|
||||
// available snapshot in the channel.
|
||||
let snapshot_block_height_option_result = request_handler
|
||||
let snapshot_block_height_option_result = request_handlers
|
||||
.handle_snapshot_requests(
|
||||
accounts_db_caching_enabled,
|
||||
test_hash_calculation,
|
||||
|
@ -598,28 +628,6 @@ impl AccountsBackgroundService {
|
|||
self.t_background.join()
|
||||
}
|
||||
|
||||
fn remove_dead_slots(
|
||||
bank: &Bank,
|
||||
request_handler: &AbsRequestHandler,
|
||||
removed_slots_count: &mut usize,
|
||||
total_remove_slots_time: &mut u64,
|
||||
) {
|
||||
let mut remove_slots_time = Measure::start("remove_slots_time");
|
||||
*removed_slots_count += request_handler.handle_pruned_banks(bank, true);
|
||||
remove_slots_time.stop();
|
||||
*total_remove_slots_time += remove_slots_time.as_us();
|
||||
|
||||
if *removed_slots_count >= 100 {
|
||||
datapoint_info!(
|
||||
"remove_slots_timing",
|
||||
("remove_slots_time", *total_remove_slots_time, i64),
|
||||
("removed_slots_count", *removed_slots_count, i64),
|
||||
);
|
||||
*total_remove_slots_time = 0;
|
||||
*removed_slots_count = 0;
|
||||
}
|
||||
}
|
||||
|
||||
fn expire_old_recycle_stores(bank: &Bank, last_expiration_check_time: &mut Instant) {
|
||||
let now = Instant::now();
|
||||
if now.duration_since(*last_expiration_check_time).as_secs()
|
||||
|
@ -645,8 +653,7 @@ mod test {
|
|||
let genesis = create_genesis_config(10);
|
||||
let bank0 = Arc::new(Bank::new_for_tests(&genesis.genesis_config));
|
||||
let (pruned_banks_sender, pruned_banks_receiver) = unbounded();
|
||||
let request_handler = AbsRequestHandler {
|
||||
snapshot_request_handler: None,
|
||||
let pruned_banks_request_handler = PrunedBanksRequestHandler {
|
||||
pruned_banks_receiver,
|
||||
};
|
||||
|
||||
|
@ -661,7 +668,7 @@ mod test {
|
|||
|
||||
assert!(!bank0.rc.accounts.scan_slot(0, |_| Some(())).is_empty());
|
||||
|
||||
AccountsBackgroundService::remove_dead_slots(&bank0, &request_handler, &mut 0, &mut 0);
|
||||
pruned_banks_request_handler.remove_dead_slots(&bank0, &mut 0, &mut 0);
|
||||
|
||||
assert!(bank0.rc.accounts.scan_slot(0, |_| Some(())).is_empty());
|
||||
}
|
||||
|
|
|
@ -7997,7 +7997,7 @@ pub(crate) mod tests {
|
|||
use {
|
||||
super::*,
|
||||
crate::{
|
||||
accounts_background_service::{AbsRequestHandler, SendDroppedBankCallback},
|
||||
accounts_background_service::{PrunedBanksRequestHandler, SendDroppedBankCallback},
|
||||
accounts_db::DEFAULT_ACCOUNTS_SHRINK_RATIO,
|
||||
accounts_index::{AccountIndex, AccountSecondaryIndexes, ScanError, ITER_BATCH_SIZE},
|
||||
ancestors::Ancestors,
|
||||
|
@ -16126,8 +16126,7 @@ pub(crate) mod tests {
|
|||
fn test_store_scan_consistency_unrooted() {
|
||||
for accounts_db_caching_enabled in &[false, true] {
|
||||
let (pruned_banks_sender, pruned_banks_receiver) = unbounded();
|
||||
let abs_request_handler = AbsRequestHandler {
|
||||
snapshot_request_handler: None,
|
||||
let pruned_banks_request_handler = PrunedBanksRequestHandler {
|
||||
pruned_banks_receiver,
|
||||
};
|
||||
test_store_scan_consistency(
|
||||
|
@ -16211,7 +16210,7 @@ pub(crate) mod tests {
|
|||
current_major_fork_bank.clean_accounts_for_tests();
|
||||
// Move purge here so that Bank::drop()->purge_slots() doesn't race
|
||||
// with clean. Simulates the call from AccountsBackgroundService
|
||||
abs_request_handler.handle_pruned_banks(¤t_major_fork_bank, true);
|
||||
pruned_banks_request_handler.handle_request(¤t_major_fork_bank, true);
|
||||
}
|
||||
},
|
||||
Some(Box::new(SendDroppedBankCallback::new(
|
||||
|
|
Loading…
Reference in New Issue