add metrics to handle_snapshot_requests (#17937)
This commit is contained in:
parent
a0872232d3
commit
e6bbd4b3f0
|
@ -222,7 +222,7 @@ mod tests {
|
|||
// set_root should send a snapshot request
|
||||
bank_forks.set_root(bank.slot(), &request_sender, None);
|
||||
bank.update_accounts_hash();
|
||||
snapshot_request_handler.handle_snapshot_requests(false, false, false);
|
||||
snapshot_request_handler.handle_snapshot_requests(false, false, false, 0);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -89,11 +89,13 @@ impl SnapshotRequestHandler {
|
|||
accounts_db_caching_enabled: bool,
|
||||
test_hash_calculation: bool,
|
||||
use_index_hash_calculation: bool,
|
||||
non_snapshot_time_us: u128,
|
||||
) -> Option<u64> {
|
||||
self.snapshot_request_receiver
|
||||
.try_iter()
|
||||
.last()
|
||||
.map(|snapshot_request| {
|
||||
let mut total_time = Measure::start("wallclock time elapsed");
|
||||
let SnapshotRequest {
|
||||
snapshot_root_bank,
|
||||
status_cache_slot_deltas,
|
||||
|
@ -188,6 +190,7 @@ impl SnapshotRequestHandler {
|
|||
let mut purge_old_snapshots_time = Measure::start("purge_old_snapshots_time");
|
||||
snapshot_utils::purge_old_snapshots(&self.snapshot_config.snapshot_path);
|
||||
purge_old_snapshots_time.stop();
|
||||
total_time.stop();
|
||||
|
||||
datapoint_info!(
|
||||
"handle_snapshot_requests-timing",
|
||||
|
@ -205,6 +208,8 @@ impl SnapshotRequestHandler {
|
|||
purge_old_snapshots_time.as_us(),
|
||||
i64
|
||||
),
|
||||
("total_us", total_time.as_us(), i64),
|
||||
("non_snapshot_time_us", non_snapshot_time_us, i64),
|
||||
);
|
||||
snapshot_root_bank.block_height()
|
||||
})
|
||||
|
@ -251,6 +256,7 @@ impl AbsRequestHandler {
|
|||
accounts_db_caching_enabled: bool,
|
||||
test_hash_calculation: bool,
|
||||
use_index_hash_calculation: bool,
|
||||
non_snapshot_time_us: u128,
|
||||
) -> Option<u64> {
|
||||
self.snapshot_request_handler
|
||||
.as_ref()
|
||||
|
@ -259,6 +265,7 @@ impl AbsRequestHandler {
|
|||
accounts_db_caching_enabled,
|
||||
test_hash_calculation,
|
||||
use_index_hash_calculation,
|
||||
non_snapshot_time_us,
|
||||
)
|
||||
})
|
||||
}
|
||||
|
@ -297,87 +304,101 @@ impl AccountsBackgroundService {
|
|||
let mut last_expiration_check_time = Instant::now();
|
||||
let t_background = Builder::new()
|
||||
.name("solana-bg-accounts".to_string())
|
||||
.spawn(move || loop {
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
.spawn(move || {
|
||||
let mut last_snapshot_end_time = None;
|
||||
loop {
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
|
||||
// Grab the current root bank
|
||||
let bank = bank_forks.read().unwrap().root_bank().clone();
|
||||
// Grab the current root bank
|
||||
let bank = bank_forks.read().unwrap().root_bank().clone();
|
||||
|
||||
// Purge accounts of any dead slots
|
||||
Self::remove_dead_slots(
|
||||
&bank,
|
||||
&request_handler,
|
||||
&mut removed_slots_count,
|
||||
&mut total_remove_slots_time,
|
||||
);
|
||||
// Purge accounts of any dead slots
|
||||
Self::remove_dead_slots(
|
||||
&bank,
|
||||
&request_handler,
|
||||
&mut removed_slots_count,
|
||||
&mut total_remove_slots_time,
|
||||
);
|
||||
|
||||
Self::expire_old_recycle_stores(&bank, &mut last_expiration_check_time);
|
||||
Self::expire_old_recycle_stores(&bank, &mut last_expiration_check_time);
|
||||
|
||||
// Check to see if there were any requests for snapshotting banks
|
||||
// < the current root bank `bank` above.
|
||||
let non_snapshot_time = last_snapshot_end_time
|
||||
.map(|last_snapshot_end_time: Instant| {
|
||||
last_snapshot_end_time.elapsed().as_micros()
|
||||
})
|
||||
.unwrap_or_default();
|
||||
|
||||
// Claim: Any snapshot request for slot `N` found here implies that the last cleanup
|
||||
// slot `M` satisfies `M < N`
|
||||
//
|
||||
// Proof: Assume for contradiction that we find a snapshot request for slot `N` here,
|
||||
// but cleanup has already happened on some slot `M >= N`. Because the call to
|
||||
// `bank.clean_accounts(true)` (in the code below) implies we only clean slots `<= bank - 1`,
|
||||
// then that means in some *previous* iteration of this loop, we must have gotten a root
|
||||
// bank for slot some slot `R` where `R > N`, but did not see the snapshot for `N` in the
|
||||
// snapshot request channel.
|
||||
//
|
||||
// However, this is impossible because BankForks.set_root() will always flush the snapshot
|
||||
// request for `N` to the snapshot request channel before setting a root `R > N`, and
|
||||
// snapshot_request_handler.handle_requests() will always look for the latest
|
||||
// available snapshot in the channel.
|
||||
let snapshot_block_height = request_handler.handle_snapshot_requests(
|
||||
accounts_db_caching_enabled,
|
||||
test_hash_calculation,
|
||||
use_index_hash_calculation,
|
||||
);
|
||||
if accounts_db_caching_enabled {
|
||||
// Note that the flush will do an internal clean of the
|
||||
// cache up to bank.slot(), so should be safe as long
|
||||
// as any later snapshots that are taken are of
|
||||
// slots >= bank.slot()
|
||||
bank.flush_accounts_cache_if_needed();
|
||||
}
|
||||
// Check to see if there were any requests for snapshotting banks
|
||||
// < the current root bank `bank` above.
|
||||
|
||||
// Claim: Any snapshot request for slot `N` found here implies that the last cleanup
|
||||
// slot `M` satisfies `M < N`
|
||||
//
|
||||
// Proof: Assume for contradiction that we find a snapshot request for slot `N` here,
|
||||
// but cleanup has already happened on some slot `M >= N`. Because the call to
|
||||
// `bank.clean_accounts(true)` (in the code below) implies we only clean slots `<= bank - 1`,
|
||||
// then that means in some *previous* iteration of this loop, we must have gotten a root
|
||||
// bank for slot some slot `R` where `R > N`, but did not see the snapshot for `N` in the
|
||||
// snapshot request channel.
|
||||
//
|
||||
// However, this is impossible because BankForks.set_root() will always flush the snapshot
|
||||
// request for `N` to the snapshot request channel before setting a root `R > N`, and
|
||||
// snapshot_request_handler.handle_requests() will always look for the latest
|
||||
// available snapshot in the channel.
|
||||
let snapshot_block_height = request_handler.handle_snapshot_requests(
|
||||
accounts_db_caching_enabled,
|
||||
test_hash_calculation,
|
||||
use_index_hash_calculation,
|
||||
non_snapshot_time,
|
||||
);
|
||||
if snapshot_block_height.is_some() {
|
||||
last_snapshot_end_time = Some(Instant::now());
|
||||
}
|
||||
|
||||
if let Some(snapshot_block_height) = snapshot_block_height {
|
||||
// Safe, see proof above
|
||||
assert!(last_cleaned_block_height <= snapshot_block_height);
|
||||
last_cleaned_block_height = snapshot_block_height;
|
||||
} else {
|
||||
if accounts_db_caching_enabled {
|
||||
bank.shrink_candidate_slots();
|
||||
// Note that the flush will do an internal clean of the
|
||||
// cache up to bank.slot(), so should be safe as long
|
||||
// as any later snapshots that are taken are of
|
||||
// slots >= bank.slot()
|
||||
bank.flush_accounts_cache_if_needed();
|
||||
}
|
||||
|
||||
if let Some(snapshot_block_height) = snapshot_block_height {
|
||||
// Safe, see proof above
|
||||
assert!(last_cleaned_block_height <= snapshot_block_height);
|
||||
last_cleaned_block_height = snapshot_block_height;
|
||||
} else {
|
||||
// under sustained writes, shrink can lag behind so cap to
|
||||
// SHRUNKEN_ACCOUNT_PER_INTERVAL (which is based on INTERVAL_MS,
|
||||
// which in turn roughly associated block time)
|
||||
consumed_budget = bank
|
||||
.process_stale_slot_with_budget(
|
||||
consumed_budget,
|
||||
SHRUNKEN_ACCOUNT_PER_INTERVAL,
|
||||
)
|
||||
.min(SHRUNKEN_ACCOUNT_PER_INTERVAL);
|
||||
}
|
||||
if bank.block_height() - last_cleaned_block_height
|
||||
> (CLEAN_INTERVAL_BLOCKS + thread_rng().gen_range(0, 10))
|
||||
{
|
||||
if accounts_db_caching_enabled {
|
||||
// Note that the flush will do an internal clean of the
|
||||
// cache up to bank.slot(), so should be safe as long
|
||||
// as any later snapshots that are taken are of
|
||||
// slots >= bank.slot()
|
||||
bank.force_flush_accounts_cache();
|
||||
bank.shrink_candidate_slots();
|
||||
} else {
|
||||
// under sustained writes, shrink can lag behind so cap to
|
||||
// SHRUNKEN_ACCOUNT_PER_INTERVAL (which is based on INTERVAL_MS,
|
||||
// which in turn roughly associated block time)
|
||||
consumed_budget = bank
|
||||
.process_stale_slot_with_budget(
|
||||
consumed_budget,
|
||||
SHRUNKEN_ACCOUNT_PER_INTERVAL,
|
||||
)
|
||||
.min(SHRUNKEN_ACCOUNT_PER_INTERVAL);
|
||||
}
|
||||
if bank.block_height() - last_cleaned_block_height
|
||||
> (CLEAN_INTERVAL_BLOCKS + thread_rng().gen_range(0, 10))
|
||||
{
|
||||
if accounts_db_caching_enabled {
|
||||
// Note that the flush will do an internal clean of the
|
||||
// cache up to bank.slot(), so should be safe as long
|
||||
// as any later snapshots that are taken are of
|
||||
// slots >= bank.slot()
|
||||
bank.force_flush_accounts_cache();
|
||||
}
|
||||
bank.clean_accounts(true, false);
|
||||
last_cleaned_block_height = bank.block_height();
|
||||
}
|
||||
bank.clean_accounts(true, false);
|
||||
last_cleaned_block_height = bank.block_height();
|
||||
}
|
||||
sleep(Duration::from_millis(INTERVAL_MS));
|
||||
}
|
||||
sleep(Duration::from_millis(INTERVAL_MS));
|
||||
})
|
||||
.unwrap();
|
||||
Self { t_background }
|
||||
|
|
Loading…
Reference in New Issue