2020-09-28 16:04:46 -07:00
// Service to clean up dead slots in accounts_db
//
// This can be expensive since we have to walk the append vecs being cleaned up.
2022-06-29 19:21:30 -07:00
mod stats ;
2021-12-03 09:00:31 -08:00
use {
crate ::{
2022-04-08 08:42:03 -07:00
accounts_hash ::CalcAccountsHashConfig ,
2021-12-03 09:00:31 -08:00
bank ::{ Bank , BankSlotDelta , DropCallback } ,
bank_forks ::BankForks ,
snapshot_config ::SnapshotConfig ,
2022-09-07 13:41:40 -07:00
snapshot_package ::{ AccountsPackageType , PendingAccountsPackage , SnapshotType } ,
2021-12-03 09:00:31 -08:00
snapshot_utils ::{ self , SnapshotError } ,
} ,
2022-08-26 11:43:59 -07:00
crossbeam_channel ::{ Receiver , SendError , Sender } ,
2021-12-03 09:00:31 -08:00
log ::* ,
rand ::{ thread_rng , Rng } ,
solana_measure ::measure ::Measure ,
solana_sdk ::{
clock ::{ BankId , Slot } ,
hash ::Hash ,
} ,
2022-06-29 19:21:30 -07:00
stats ::StatsManager ,
2021-12-03 09:00:31 -08:00
std ::{
boxed ::Box ,
fmt ::{ Debug , Formatter } ,
sync ::{
2022-08-26 12:07:05 -07:00
atomic ::{ AtomicBool , AtomicU64 , Ordering } ,
2021-12-03 09:00:31 -08:00
Arc , RwLock ,
} ,
thread ::{ self , sleep , Builder , JoinHandle } ,
time ::{ Duration , Instant } ,
2020-12-12 17:22:34 -08:00
} ,
2020-09-28 16:04:46 -07:00
} ;
const INTERVAL_MS : u64 = 100 ;
const SHRUNKEN_ACCOUNT_PER_SEC : usize = 250 ;
const SHRUNKEN_ACCOUNT_PER_INTERVAL : usize =
SHRUNKEN_ACCOUNT_PER_SEC / ( 1000 / INTERVAL_MS as usize ) ;
const CLEAN_INTERVAL_BLOCKS : u64 = 100 ;
2021-02-25 00:27:27 -08:00
// This value is chosen to spread the dropping cost over 3 expiration checks
// RecycleStores are fully populated almost all of its lifetime. So, otherwise
// this would drop MAX_RECYCLE_STORES mmaps at once in the worst case...
// (Anyway, the dropping part is outside the AccountsDb::recycle_stores lock
// and dropped in this AccountsBackgroundServe, so this shouldn't matter much)
const RECYCLE_STORE_EXPIRATION_INTERVAL_SECS : u64 = crate ::accounts_db ::EXPIRATION_TTL_SECONDS / 3 ;
2020-09-28 16:04:46 -07:00
pub type SnapshotRequestSender = Sender < SnapshotRequest > ;
pub type SnapshotRequestReceiver = Receiver < SnapshotRequest > ;
2021-06-14 21:04:01 -07:00
pub type DroppedSlotsSender = Sender < ( Slot , BankId ) > ;
pub type DroppedSlotsReceiver = Receiver < ( Slot , BankId ) > ;
2020-12-12 17:22:34 -08:00
2022-08-26 12:07:05 -07:00
/// interval to report bank_drop queue events: 60s
const BANK_DROP_SIGNAL_CHANNEL_REPORT_INTERVAL : u64 = 60_000 ;
/// maximum drop bank signal queue length
const MAX_DROP_BANK_SIGNAL_QUEUE_SIZE : usize = 10_000 ;
#[ derive(Debug, Default) ]
struct PrunedBankQueueLenReporter {
last_report_time : AtomicU64 ,
}
impl PrunedBankQueueLenReporter {
fn report ( & self , q_len : usize ) {
let now = solana_sdk ::timing ::timestamp ( ) ;
let last_report_time = self . last_report_time . load ( Ordering ::Acquire ) ;
if q_len > MAX_DROP_BANK_SIGNAL_QUEUE_SIZE
& & now . saturating_sub ( last_report_time ) > BANK_DROP_SIGNAL_CHANNEL_REPORT_INTERVAL
{
2022-08-26 12:29:01 -07:00
datapoint_warn! ( " excessive_pruned_bank_channel_len " , ( " len " , q_len , i64 ) ) ;
2022-08-26 12:07:05 -07:00
self . last_report_time . store ( now , Ordering ::Release ) ;
}
}
}
lazy_static! {
static ref BANK_DROP_QUEUE_REPORTER : PrunedBankQueueLenReporter =
PrunedBankQueueLenReporter ::default ( ) ;
}
2020-12-12 17:22:34 -08:00
#[ derive(Clone) ]
pub struct SendDroppedBankCallback {
sender : DroppedSlotsSender ,
}
impl DropCallback for SendDroppedBankCallback {
fn callback ( & self , bank : & Bank ) {
2022-08-26 12:07:05 -07:00
BANK_DROP_QUEUE_REPORTER . report ( self . sender . len ( ) ) ;
2022-08-26 12:29:01 -07:00
if let Err ( SendError ( _ ) ) = self . sender . send ( ( bank . slot ( ) , bank . bank_id ( ) ) ) {
info! ( " bank DropCallback signal queue disconnected. " ) ;
2020-12-12 17:22:34 -08:00
}
}
fn clone_box ( & self ) -> Box < dyn DropCallback + Send + Sync > {
Box ::new ( self . clone ( ) )
}
}
impl Debug for SendDroppedBankCallback {
fn fmt ( & self , f : & mut Formatter ) -> std ::fmt ::Result {
write! ( f , " SendDroppedBankCallback({:p}) " , self )
}
}
impl SendDroppedBankCallback {
pub fn new ( sender : DroppedSlotsSender ) -> Self {
2022-08-26 12:29:01 -07:00
Self { sender }
2020-12-12 17:22:34 -08:00
}
}
2020-09-28 16:04:46 -07:00
pub struct SnapshotRequest {
pub snapshot_root_bank : Arc < Bank > ,
pub status_cache_slot_deltas : Vec < BankSlotDelta > ,
2022-09-07 13:41:40 -07:00
pub request_type : SnapshotRequestType ,
}
2022-09-21 09:39:09 -07:00
impl Debug for SnapshotRequest {
fn fmt ( & self , f : & mut std ::fmt ::Formatter < '_ > ) -> std ::fmt ::Result {
f . debug_struct ( " SnapshotRequest " )
. field ( " request type " , & self . request_type )
. field ( " bank slot " , & self . snapshot_root_bank . slot ( ) )
. finish ( )
}
}
2022-09-07 13:41:40 -07:00
/// What type of request is this?
///
/// The snapshot request has been expanded to support more than just snapshots. This is
/// confusing, but can be resolved by renaming this type; or better, by creating an enum with
/// variants that wrap the fields-of-interest for each request.
#[ derive(Debug, Copy, Clone, Eq, PartialEq) ]
pub enum SnapshotRequestType {
Snapshot ,
EpochAccountsHash ,
2020-09-28 16:04:46 -07:00
}
pub struct SnapshotRequestHandler {
pub snapshot_config : SnapshotConfig ,
pub snapshot_request_receiver : SnapshotRequestReceiver ,
2022-04-06 03:47:19 -07:00
pub pending_accounts_package : PendingAccountsPackage ,
2020-09-28 16:04:46 -07:00
}
impl SnapshotRequestHandler {
// Returns the latest requested snapshot slot, if one exists
2021-02-04 07:00:33 -08:00
pub fn handle_snapshot_requests (
& self ,
accounts_db_caching_enabled : bool ,
test_hash_calculation : bool ,
2021-06-14 13:46:19 -07:00
non_snapshot_time_us : u128 ,
2021-08-31 16:33:27 -07:00
last_full_snapshot_slot : & mut Option < Slot > ,
) -> Option < Result < u64 , SnapshotError > > {
2022-09-21 09:39:09 -07:00
self . snapshot_request_receiver . try_iter ( )
. map ( | request | {
let accounts_package_type = new_accounts_package_type ( & request , & self . snapshot_config , * last_full_snapshot_slot ) ;
( request , accounts_package_type )
} )
. inspect ( | ( request , package_type ) | trace! ( " outstanding snapshot request: {:?}, {:?} " , request , package_type ) )
. max_by ( cmp_snapshot_requests )
. map ( | ( snapshot_request , accounts_package_type ) | {
trace! ( " handling snapshot request: {:?}, {:?} " , snapshot_request , accounts_package_type ) ;
2021-06-15 07:01:11 -07:00
let mut total_time = Measure ::start ( " snapshot_request_receiver_total_time " ) ;
2022-09-20 17:31:08 -07:00
let accounts_package_type = new_accounts_package_type ( & snapshot_request , & self . snapshot_config , * last_full_snapshot_slot ) ;
2020-09-28 16:04:46 -07:00
let SnapshotRequest {
snapshot_root_bank ,
status_cache_slot_deltas ,
2022-09-30 11:59:41 -07:00
request_type ,
2020-09-28 16:04:46 -07:00
} = snapshot_request ;
2022-09-30 11:59:41 -07:00
// we should not rely on the state of this validator until startup verification is complete (unless handling an EAH request)
assert! ( snapshot_root_bank . is_startup_verification_complete ( ) | | request_type = = SnapshotRequestType ::EpochAccountsHash ) ;
2022-06-29 14:48:33 -07:00
2021-02-12 16:35:11 -08:00
let previous_hash = if test_hash_calculation {
// We have to use the index version here.
2022-04-08 08:42:03 -07:00
// We cannot calculate the non-index way because cache has not been flushed and stores don't match reality. This comment is out of date and can be re-evaluated.
2022-01-31 09:39:23 -08:00
snapshot_root_bank . update_accounts_hash_with_index_option ( true , false , false )
2021-02-12 16:35:11 -08:00
} else {
Hash ::default ( )
} ;
2020-09-28 16:04:46 -07:00
let mut shrink_time = Measure ::start ( " shrink_time " ) ;
2021-01-11 17:00:23 -08:00
if ! accounts_db_caching_enabled {
snapshot_root_bank
. process_stale_slot_with_budget ( 0 , SHRUNKEN_ACCOUNT_PER_INTERVAL ) ;
}
2020-09-28 16:04:46 -07:00
shrink_time . stop ( ) ;
2021-01-11 17:00:23 -08:00
let mut flush_accounts_cache_time = Measure ::start ( " flush_accounts_cache_time " ) ;
if accounts_db_caching_enabled {
2021-01-26 20:08:55 -08:00
// Forced cache flushing MUST flush all roots <= snapshot_root_bank.slot().
// That's because `snapshot_root_bank.slot()` must be root at this point,
// and contains relevant updates because each bank has at least 1 account update due
// to sysvar maintenance. Otherwise, this would cause missing storages in the snapshot
2021-01-11 17:00:23 -08:00
snapshot_root_bank . force_flush_accounts_cache ( ) ;
2021-01-26 20:08:55 -08:00
// Ensure all roots <= `self.slot()` have been flushed.
// Note `max_flush_root` could be larger than self.slot() if there are
// `> MAX_CACHE_SLOT` cached and rooted slots which triggered earlier flushes.
assert! (
snapshot_root_bank . slot ( )
< = snapshot_root_bank
. rc
. accounts
. accounts_db
. accounts_cache
. fetch_max_flush_root ( )
) ;
2021-01-11 17:00:23 -08:00
}
flush_accounts_cache_time . stop ( ) ;
2021-02-10 12:38:00 -08:00
let hash_for_testing = if test_hash_calculation {
2022-04-08 08:42:03 -07:00
let use_index_hash_calculation = false ;
let check_hash = false ;
2022-05-05 13:16:15 -07:00
let ( this_hash , capitalization ) = snapshot_root_bank . accounts ( ) . accounts_db . calculate_accounts_hash_helper (
2022-04-08 08:42:03 -07:00
use_index_hash_calculation ,
snapshot_root_bank . slot ( ) ,
& CalcAccountsHashConfig {
use_bg_thread_pool : true ,
check_hash ,
ancestors : None ,
2022-05-02 11:46:17 -07:00
epoch_schedule : snapshot_root_bank . epoch_schedule ( ) ,
2022-04-11 11:15:00 -07:00
rent_collector : snapshot_root_bank . rent_collector ( ) ,
2022-07-19 07:55:52 -07:00
store_detailed_debug_info_on_failure : false ,
2022-07-28 07:46:34 -07:00
full_snapshot : None ,
2022-09-14 11:29:23 -07:00
enable_rehashing : snapshot_root_bank . bank_enable_rehashing_on_accounts_hash ( ) ,
2022-04-08 08:42:03 -07:00
} ,
) . unwrap ( ) ;
2021-02-12 16:35:11 -08:00
assert_eq! ( previous_hash , this_hash ) ;
2022-05-05 13:16:15 -07:00
assert_eq! ( capitalization , snapshot_root_bank . capitalization ( ) ) ;
2022-04-08 08:42:03 -07:00
Some ( this_hash )
2021-02-10 12:38:00 -08:00
} else {
None
} ;
2021-02-04 07:00:33 -08:00
2020-09-28 16:04:46 -07:00
let mut clean_time = Measure ::start ( " clean_time " ) ;
2022-08-19 15:15:04 -07:00
snapshot_root_bank . clean_accounts ( * last_full_snapshot_slot ) ;
2020-09-28 16:04:46 -07:00
clean_time . stop ( ) ;
2021-01-11 17:00:23 -08:00
if accounts_db_caching_enabled {
shrink_time = Measure ::start ( " shrink_time " ) ;
snapshot_root_bank . shrink_candidate_slots ( ) ;
shrink_time . stop ( ) ;
}
2022-09-20 17:31:08 -07:00
if accounts_package_type = = AccountsPackageType ::Snapshot ( SnapshotType ::FullSnapshot ) {
* last_full_snapshot_slot = Some ( snapshot_root_bank . slot ( ) ) ;
}
2021-08-31 16:33:27 -07:00
// Snapshot the bank and send over an accounts package
2020-09-28 16:04:46 -07:00
let mut snapshot_time = Measure ::start ( " snapshot_time " ) ;
2021-08-31 16:33:27 -07:00
let result = snapshot_utils ::snapshot_bank (
2020-09-28 16:04:46 -07:00
& snapshot_root_bank ,
status_cache_slot_deltas ,
2022-04-06 03:47:19 -07:00
& self . pending_accounts_package ,
2021-08-21 13:41:03 -07:00
& self . snapshot_config . bank_snapshots_dir ,
2022-05-10 13:37:41 -07:00
& self . snapshot_config . full_snapshot_archives_dir ,
& self . snapshot_config . incremental_snapshot_archives_dir ,
2020-09-28 16:04:46 -07:00
self . snapshot_config . snapshot_version ,
2021-08-31 16:33:27 -07:00
self . snapshot_config . archive_format ,
2021-02-04 07:00:33 -08:00
hash_for_testing ,
2022-09-07 13:41:40 -07:00
accounts_package_type ,
2020-09-28 16:04:46 -07:00
) ;
2021-08-31 16:33:27 -07:00
if let Err ( e ) = result {
2020-09-28 16:04:46 -07:00
warn! (
2022-09-07 13:41:40 -07:00
" Error taking bank snapshot. slot: {}, accounts package type: {:?}, err: {:?} " ,
2020-09-28 16:04:46 -07:00
snapshot_root_bank . slot ( ) ,
2022-09-07 13:41:40 -07:00
accounts_package_type ,
2021-08-31 16:33:27 -07:00
e ,
2020-09-28 16:04:46 -07:00
) ;
2021-08-31 16:33:27 -07:00
if Self ::is_snapshot_error_fatal ( & e ) {
return Err ( e ) ;
}
2020-09-28 16:04:46 -07:00
}
snapshot_time . stop ( ) ;
2022-09-07 13:41:40 -07:00
info! ( " Took bank snapshot. accounts package type: {:?}, slot: {}, accounts hash: {}, bank hash: {} " ,
accounts_package_type ,
2021-09-15 05:39:21 -07:00
snapshot_root_bank . slot ( ) ,
snapshot_root_bank . get_accounts_hash ( ) ,
snapshot_root_bank . hash ( ) ,
) ;
2020-09-28 16:04:46 -07:00
// Cleanup outdated snapshots
let mut purge_old_snapshots_time = Measure ::start ( " purge_old_snapshots_time " ) ;
2021-08-21 13:41:03 -07:00
snapshot_utils ::purge_old_bank_snapshots ( & self . snapshot_config . bank_snapshots_dir ) ;
2020-09-28 16:04:46 -07:00
purge_old_snapshots_time . stop ( ) ;
2021-06-14 13:46:19 -07:00
total_time . stop ( ) ;
2020-09-28 16:04:46 -07:00
datapoint_info! (
" handle_snapshot_requests-timing " ,
2021-01-11 17:00:23 -08:00
(
" flush_accounts_cache_time " ,
flush_accounts_cache_time . as_us ( ) ,
i64
) ,
2020-09-28 16:04:46 -07:00
( " shrink_time " , shrink_time . as_us ( ) , i64 ) ,
( " clean_time " , clean_time . as_us ( ) , i64 ) ,
( " snapshot_time " , snapshot_time . as_us ( ) , i64 ) ,
(
" purge_old_snapshots_time " ,
purge_old_snapshots_time . as_us ( ) ,
i64
2020-10-19 09:48:29 -07:00
) ,
2021-06-14 13:46:19 -07:00
( " total_us " , total_time . as_us ( ) , i64 ) ,
( " non_snapshot_time_us " , non_snapshot_time_us , i64 ) ,
2020-09-28 16:04:46 -07:00
) ;
2021-08-31 16:33:27 -07:00
Ok ( snapshot_root_bank . block_height ( ) )
2020-09-28 16:04:46 -07:00
} )
}
2021-08-31 16:33:27 -07:00
/// Check if a SnapshotError should be treated as 'fatal' by SnapshotRequestHandler, and
/// `handle_snapshot_requests()` in particular. Fatal errors will cause the node to shutdown.
/// Non-fatal errors are logged and then swallowed.
///
/// All `SnapshotError`s are enumerated, and there is **NO** default case. This way, if
/// a new error is added to SnapshotError, a conscious decision must be made on how it should
/// be handled.
fn is_snapshot_error_fatal ( err : & SnapshotError ) -> bool {
match err {
SnapshotError ::Io ( .. ) = > true ,
SnapshotError ::Serialize ( .. ) = > true ,
SnapshotError ::ArchiveGenerationFailure ( .. ) = > true ,
SnapshotError ::StoragePathSymlinkInvalid = > true ,
SnapshotError ::UnpackError ( .. ) = > true ,
SnapshotError ::IoWithSource ( .. ) = > true ,
SnapshotError ::PathToFileNameError ( .. ) = > true ,
SnapshotError ::FileNameToStrError ( .. ) = > true ,
SnapshotError ::ParseSnapshotArchiveFileNameError ( .. ) = > true ,
SnapshotError ::MismatchedBaseSlot ( .. ) = > true ,
SnapshotError ::NoSnapshotArchives = > true ,
SnapshotError ::MismatchedSlotHash ( .. ) = > true ,
2022-08-18 06:48:58 -07:00
SnapshotError ::VerifySlotDeltas ( .. ) = > true ,
2021-08-31 16:33:27 -07:00
}
}
2020-09-28 16:04:46 -07:00
}
2022-03-23 10:04:58 -07:00
#[ derive(Default, Clone) ]
2021-02-18 23:42:09 -08:00
pub struct AbsRequestSender {
2020-12-12 17:22:34 -08:00
snapshot_request_sender : Option < SnapshotRequestSender > ,
}
2021-02-18 23:42:09 -08:00
impl AbsRequestSender {
2022-03-15 13:14:49 -07:00
pub fn new ( snapshot_request_sender : SnapshotRequestSender ) -> Self {
Self {
snapshot_request_sender : Some ( snapshot_request_sender ) ,
2020-12-12 17:22:34 -08:00
}
}
pub fn is_snapshot_creation_enabled ( & self ) -> bool {
self . snapshot_request_sender . is_some ( )
}
pub fn send_snapshot_request (
& self ,
snapshot_request : SnapshotRequest ,
) -> Result < ( ) , SendError < SnapshotRequest > > {
if let Some ( ref snapshot_request_sender ) = self . snapshot_request_sender {
snapshot_request_sender . send ( snapshot_request )
} else {
Ok ( ( ) )
}
}
}
2022-08-29 10:30:06 -07:00
#[ derive(Debug) ]
pub struct PrunedBanksRequestHandler {
2020-12-12 17:22:34 -08:00
pub pruned_banks_receiver : DroppedSlotsReceiver ,
}
2022-08-29 10:30:06 -07:00
impl PrunedBanksRequestHandler {
pub fn handle_request ( & self , bank : & Bank , is_serialized_with_abs : bool ) -> usize {
let mut count = 0 ;
for ( pruned_slot , pruned_bank_id ) in self . pruned_banks_receiver . try_iter ( ) {
count + = 1 ;
bank . rc . accounts . accounts_db . purge_slot (
pruned_slot ,
pruned_bank_id ,
is_serialized_with_abs ,
) ;
}
count
}
fn remove_dead_slots (
& self ,
bank : & Bank ,
removed_slots_count : & mut usize ,
total_remove_slots_time : & mut u64 ,
) {
let mut remove_slots_time = Measure ::start ( " remove_slots_time " ) ;
* removed_slots_count + = self . handle_request ( bank , true ) ;
remove_slots_time . stop ( ) ;
* total_remove_slots_time + = remove_slots_time . as_us ( ) ;
if * removed_slots_count > = 100 {
datapoint_info! (
" remove_slots_timing " ,
( " remove_slots_time " , * total_remove_slots_time , i64 ) ,
( " removed_slots_count " , * removed_slots_count , i64 ) ,
) ;
* total_remove_slots_time = 0 ;
* removed_slots_count = 0 ;
}
}
}
pub struct AbsRequestHandlers {
2022-09-07 07:08:42 -07:00
pub snapshot_request_handler : SnapshotRequestHandler ,
2022-08-29 10:30:06 -07:00
pub pruned_banks_request_handler : PrunedBanksRequestHandler ,
}
impl AbsRequestHandlers {
2020-12-12 17:22:34 -08:00
// Returns the latest requested snapshot block height, if one exists
2021-02-04 07:00:33 -08:00
pub fn handle_snapshot_requests (
& self ,
accounts_db_caching_enabled : bool ,
test_hash_calculation : bool ,
2021-06-14 13:46:19 -07:00
non_snapshot_time_us : u128 ,
2021-08-31 16:33:27 -07:00
last_full_snapshot_slot : & mut Option < Slot > ,
) -> Option < Result < u64 , SnapshotError > > {
2022-09-07 07:08:42 -07:00
self . snapshot_request_handler . handle_snapshot_requests (
accounts_db_caching_enabled ,
test_hash_calculation ,
non_snapshot_time_us ,
last_full_snapshot_slot ,
)
2020-12-12 17:22:34 -08:00
}
}
2020-09-28 16:04:46 -07:00
pub struct AccountsBackgroundService {
t_background : JoinHandle < ( ) > ,
}
impl AccountsBackgroundService {
pub fn new (
bank_forks : Arc < RwLock < BankForks > > ,
exit : & Arc < AtomicBool > ,
2022-08-29 10:30:06 -07:00
request_handlers : AbsRequestHandlers ,
2021-01-11 17:00:23 -08:00
accounts_db_caching_enabled : bool ,
2021-02-04 07:00:33 -08:00
test_hash_calculation : bool ,
2021-08-31 16:33:27 -07:00
mut last_full_snapshot_slot : Option < Slot > ,
2020-09-28 16:04:46 -07:00
) -> Self {
info! ( " AccountsBackgroundService active " ) ;
let exit = exit . clone ( ) ;
let mut consumed_budget = 0 ;
let mut last_cleaned_block_height = 0 ;
2020-12-12 17:22:34 -08:00
let mut removed_slots_count = 0 ;
let mut total_remove_slots_time = 0 ;
2021-02-25 00:27:27 -08:00
let mut last_expiration_check_time = Instant ::now ( ) ;
2020-09-28 16:04:46 -07:00
let t_background = Builder ::new ( )
2022-08-17 08:40:23 -07:00
. name ( " solBgAccounts " . to_string ( ) )
2021-06-14 13:46:19 -07:00
. spawn ( move | | {
2022-06-29 19:21:30 -07:00
let mut stats = StatsManager ::new ( ) ;
2021-06-14 13:46:19 -07:00
let mut last_snapshot_end_time = None ;
loop {
if exit . load ( Ordering ::Relaxed ) {
break ;
}
2022-06-29 19:21:30 -07:00
let start_time = Instant ::now ( ) ;
2020-09-28 16:04:46 -07:00
2021-06-14 13:46:19 -07:00
// Grab the current root bank
let bank = bank_forks . read ( ) . unwrap ( ) . root_bank ( ) . clone ( ) ;
2020-09-28 16:04:46 -07:00
2021-06-14 13:46:19 -07:00
// Purge accounts of any dead slots
2022-08-29 10:30:06 -07:00
request_handlers
. pruned_banks_request_handler
. remove_dead_slots (
& bank ,
& mut removed_slots_count ,
& mut total_remove_slots_time ,
) ;
2020-12-12 17:22:34 -08:00
2021-06-14 13:46:19 -07:00
Self ::expire_old_recycle_stores ( & bank , & mut last_expiration_check_time ) ;
let non_snapshot_time = last_snapshot_end_time
. map ( | last_snapshot_end_time : Instant | {
last_snapshot_end_time . elapsed ( ) . as_micros ( )
} )
. unwrap_or_default ( ) ;
// Check to see if there were any requests for snapshotting banks
// < the current root bank `bank` above.
// Claim: Any snapshot request for slot `N` found here implies that the last cleanup
// slot `M` satisfies `M < N`
//
// Proof: Assume for contradiction that we find a snapshot request for slot `N` here,
// but cleanup has already happened on some slot `M >= N`. Because the call to
// `bank.clean_accounts(true)` (in the code below) implies we only clean slots `<= bank - 1`,
// then that means in some *previous* iteration of this loop, we must have gotten a root
// bank for slot some slot `R` where `R > N`, but did not see the snapshot for `N` in the
// snapshot request channel.
//
// However, this is impossible because BankForks.set_root() will always flush the snapshot
// request for `N` to the snapshot request channel before setting a root `R > N`, and
// snapshot_request_handler.handle_requests() will always look for the latest
// available snapshot in the channel.
2022-08-29 10:30:06 -07:00
let snapshot_block_height_option_result = request_handlers
2021-08-31 16:33:27 -07:00
. handle_snapshot_requests (
accounts_db_caching_enabled ,
test_hash_calculation ,
non_snapshot_time ,
& mut last_full_snapshot_slot ,
) ;
if snapshot_block_height_option_result . is_some ( ) {
2021-06-14 13:46:19 -07:00
last_snapshot_end_time = Some ( Instant ::now ( ) ) ;
}
2020-09-28 16:04:46 -07:00
2021-01-11 17:00:23 -08:00
if accounts_db_caching_enabled {
2021-06-14 13:46:19 -07:00
// Note that the flush will do an internal clean of the
// cache up to bank.slot(), so should be safe as long
// as any later snapshots that are taken are of
// slots >= bank.slot()
bank . flush_accounts_cache_if_needed ( ) ;
2021-01-11 17:00:23 -08:00
}
2021-06-14 13:46:19 -07:00
2021-08-31 16:33:27 -07:00
if let Some ( snapshot_block_height_result ) = snapshot_block_height_option_result
{
2021-06-14 13:46:19 -07:00
// Safe, see proof above
2021-08-31 16:33:27 -07:00
if let Ok ( snapshot_block_height ) = snapshot_block_height_result {
assert! ( last_cleaned_block_height < = snapshot_block_height ) ;
last_cleaned_block_height = snapshot_block_height ;
} else {
exit . store ( true , Ordering ::Relaxed ) ;
return ;
}
2021-06-14 13:46:19 -07:00
} else {
2021-01-11 17:00:23 -08:00
if accounts_db_caching_enabled {
2021-06-14 13:46:19 -07:00
bank . shrink_candidate_slots ( ) ;
} else {
// under sustained writes, shrink can lag behind so cap to
// SHRUNKEN_ACCOUNT_PER_INTERVAL (which is based on INTERVAL_MS,
// which in turn roughly associated block time)
consumed_budget = bank
. process_stale_slot_with_budget (
consumed_budget ,
SHRUNKEN_ACCOUNT_PER_INTERVAL ,
)
. min ( SHRUNKEN_ACCOUNT_PER_INTERVAL ) ;
}
if bank . block_height ( ) - last_cleaned_block_height
> ( CLEAN_INTERVAL_BLOCKS + thread_rng ( ) . gen_range ( 0 , 10 ) )
{
if accounts_db_caching_enabled {
// Note that the flush will do an internal clean of the
// cache up to bank.slot(), so should be safe as long
// as any later snapshots that are taken are of
// slots >= bank.slot()
bank . force_flush_accounts_cache ( ) ;
}
2022-08-19 15:15:04 -07:00
bank . clean_accounts ( last_full_snapshot_slot ) ;
2021-06-14 13:46:19 -07:00
last_cleaned_block_height = bank . block_height ( ) ;
2021-01-11 17:00:23 -08:00
}
2020-09-28 16:04:46 -07:00
}
2022-06-29 19:21:30 -07:00
stats . record_and_maybe_submit ( start_time . elapsed ( ) ) ;
2021-06-14 13:46:19 -07:00
sleep ( Duration ::from_millis ( INTERVAL_MS ) ) ;
2020-09-28 16:04:46 -07:00
}
} )
. unwrap ( ) ;
Self { t_background }
}
2022-08-04 13:44:31 -07:00
/// Should be called immediately after bank_fork_utils::load_bank_forks(), and as such, there
/// should only be one bank, the root bank, in `bank_forks`
/// All banks added to `bank_forks` will be descended from the root bank, and thus will inherit
/// the bank drop callback.
pub fn setup_bank_drop_callback ( bank_forks : Arc < RwLock < BankForks > > ) -> DroppedSlotsReceiver {
assert_eq! ( bank_forks . read ( ) . unwrap ( ) . banks ( ) . len ( ) , 1 ) ;
2022-08-24 10:48:23 -07:00
let ( pruned_banks_sender , pruned_banks_receiver ) = crossbeam_channel ::unbounded ( ) ;
2022-08-04 13:44:31 -07:00
{
let root_bank = bank_forks . read ( ) . unwrap ( ) . root_bank ( ) ;
root_bank . set_callback ( Some ( Box ::new (
root_bank
. rc
. accounts
. accounts_db
. create_drop_bank_callback ( pruned_banks_sender ) ,
) ) ) ;
}
pruned_banks_receiver
}
2020-09-28 16:04:46 -07:00
pub fn join ( self ) -> thread ::Result < ( ) > {
self . t_background . join ( )
}
2020-12-12 17:22:34 -08:00
2021-02-25 00:27:27 -08:00
fn expire_old_recycle_stores ( bank : & Bank , last_expiration_check_time : & mut Instant ) {
let now = Instant ::now ( ) ;
if now . duration_since ( * last_expiration_check_time ) . as_secs ( )
> RECYCLE_STORE_EXPIRATION_INTERVAL_SECS
{
bank . expire_old_recycle_stores ( ) ;
* last_expiration_check_time = now ;
}
}
2020-12-12 17:22:34 -08:00
}
2022-09-20 17:31:08 -07:00
/// Get the AccountsPackageType from a given SnapshotRequest
#[ must_use ]
fn new_accounts_package_type (
snapshot_request : & SnapshotRequest ,
snapshot_config : & SnapshotConfig ,
last_full_snapshot_slot : Option < Slot > ,
) -> AccountsPackageType {
let block_height = snapshot_request . snapshot_root_bank . block_height ( ) ;
match snapshot_request . request_type {
SnapshotRequestType ::EpochAccountsHash = > AccountsPackageType ::EpochAccountsHash ,
_ = > {
if snapshot_utils ::should_take_full_snapshot (
block_height ,
snapshot_config . full_snapshot_archive_interval_slots ,
) {
AccountsPackageType ::Snapshot ( SnapshotType ::FullSnapshot )
} else if snapshot_utils ::should_take_incremental_snapshot (
block_height ,
snapshot_config . incremental_snapshot_archive_interval_slots ,
last_full_snapshot_slot ,
) {
AccountsPackageType ::Snapshot ( SnapshotType ::IncrementalSnapshot (
last_full_snapshot_slot . unwrap ( ) ,
) )
} else {
AccountsPackageType ::AccountsHashVerifier
}
}
}
}
2022-09-21 09:39:09 -07:00
/// Compare snapshot requests; used to pick the highest priority request to handle.
///
/// Priority, from highest to lowest:
/// - Epoch Accounts Hash
/// - Full Snapshot
/// - Incremental Snapshot
/// - Accounts Hash Verifier
///
/// If two snapshots of the same type are being compared, their bank slots are tiebreakers.
#[ must_use ]
fn cmp_snapshot_requests (
a : & ( SnapshotRequest , AccountsPackageType ) ,
b : & ( SnapshotRequest , AccountsPackageType ) ,
) -> std ::cmp ::Ordering {
let ( snapshot_request_a , accounts_package_type_a ) = a ;
let ( snapshot_request_b , accounts_package_type_b ) = b ;
let slot_a = snapshot_request_a . snapshot_root_bank . slot ( ) ;
let slot_b = snapshot_request_b . snapshot_root_bank . slot ( ) ;
use { AccountsPackageType ::* , SnapshotType ::* } ;
match ( accounts_package_type_a , accounts_package_type_b ) {
// Epoch Accounts Hash packages
( EpochAccountsHash , EpochAccountsHash ) = > {
panic! ( " Only a single EAH snapshot request is allowed at a time " )
}
( EpochAccountsHash , _ ) = > std ::cmp ::Ordering ::Greater ,
( _ , EpochAccountsHash ) = > std ::cmp ::Ordering ::Less ,
// Snapshot packages
( Snapshot ( snapshot_type_a ) , Snapshot ( snapshot_type_b ) ) = > {
match ( snapshot_type_a , snapshot_type_b ) {
( FullSnapshot , FullSnapshot ) = > slot_a . cmp ( & slot_b ) ,
( FullSnapshot , IncrementalSnapshot ( _ ) ) = > std ::cmp ::Ordering ::Greater ,
( IncrementalSnapshot ( _ ) , FullSnapshot ) = > std ::cmp ::Ordering ::Less ,
( IncrementalSnapshot ( base_slot_a ) , IncrementalSnapshot ( base_slot_b ) ) = > {
slot_a . cmp ( & slot_b ) . then ( base_slot_a . cmp ( base_slot_b ) )
}
}
}
( Snapshot ( _ ) , _ ) = > std ::cmp ::Ordering ::Greater ,
( _ , Snapshot ( _ ) ) = > std ::cmp ::Ordering ::Less ,
// Accounts Hash Verifier packages
( AccountsHashVerifier , AccountsHashVerifier ) = > slot_a . cmp ( & slot_b ) ,
}
}
2020-12-12 17:22:34 -08:00
#[ cfg(test) ]
mod test {
2021-12-03 09:00:31 -08:00
use {
super ::* ,
crate ::genesis_utils ::create_genesis_config ,
crossbeam_channel ::unbounded ,
solana_sdk ::{ account ::AccountSharedData , pubkey ::Pubkey } ,
} ;
2020-12-12 17:22:34 -08:00
#[ test ]
fn test_accounts_background_service_remove_dead_slots ( ) {
let genesis = create_genesis_config ( 10 ) ;
2021-08-05 06:42:38 -07:00
let bank0 = Arc ::new ( Bank ::new_for_tests ( & genesis . genesis_config ) ) ;
2020-12-12 17:22:34 -08:00
let ( pruned_banks_sender , pruned_banks_receiver ) = unbounded ( ) ;
2022-08-29 10:30:06 -07:00
let pruned_banks_request_handler = PrunedBanksRequestHandler {
2020-12-12 17:22:34 -08:00
pruned_banks_receiver ,
} ;
// Store an account in slot 0
let account_key = Pubkey ::new_unique ( ) ;
2021-03-09 13:06:07 -08:00
bank0 . store_account (
& account_key ,
& AccountSharedData ::new ( 264 , 0 , & Pubkey ::default ( ) ) ,
) ;
2020-12-12 17:22:34 -08:00
assert! ( bank0 . get_account ( & account_key ) . is_some ( ) ) ;
2021-06-14 21:04:01 -07:00
pruned_banks_sender . send ( ( 0 , 0 ) ) . unwrap ( ) ;
2021-04-16 08:23:32 -07:00
assert! ( ! bank0 . rc . accounts . scan_slot ( 0 , | _ | Some ( ( ) ) ) . is_empty ( ) ) ;
2022-08-29 10:30:06 -07:00
pruned_banks_request_handler . remove_dead_slots ( & bank0 , & mut 0 , & mut 0 ) ;
2020-12-12 17:22:34 -08:00
2021-04-16 08:23:32 -07:00
assert! ( bank0 . rc . accounts . scan_slot ( 0 , | _ | Some ( ( ) ) ) . is_empty ( ) ) ;
2020-12-12 17:22:34 -08:00
}
2022-09-21 09:39:09 -07:00
#[ test ]
fn test_cmp_snapshot_requests ( ) {
let genesis_config_info = create_genesis_config ( 10 ) ;
let bank = Arc ::new ( Bank ::new_for_tests ( & genesis_config_info . genesis_config ) ) ;
for ( accounts_package_type_a , accounts_package_type_b , expected_result ) in [
(
AccountsPackageType ::EpochAccountsHash ,
AccountsPackageType ::Snapshot ( SnapshotType ::FullSnapshot ) ,
std ::cmp ::Ordering ::Greater ,
) ,
(
AccountsPackageType ::EpochAccountsHash ,
AccountsPackageType ::Snapshot ( SnapshotType ::IncrementalSnapshot ( 5 ) ) ,
std ::cmp ::Ordering ::Greater ,
) ,
(
AccountsPackageType ::EpochAccountsHash ,
AccountsPackageType ::AccountsHashVerifier ,
std ::cmp ::Ordering ::Greater ,
) ,
(
AccountsPackageType ::Snapshot ( SnapshotType ::FullSnapshot ) ,
AccountsPackageType ::EpochAccountsHash ,
std ::cmp ::Ordering ::Less ,
) ,
(
AccountsPackageType ::Snapshot ( SnapshotType ::FullSnapshot ) ,
AccountsPackageType ::Snapshot ( SnapshotType ::FullSnapshot ) ,
std ::cmp ::Ordering ::Equal ,
) ,
(
AccountsPackageType ::Snapshot ( SnapshotType ::FullSnapshot ) ,
AccountsPackageType ::Snapshot ( SnapshotType ::IncrementalSnapshot ( 5 ) ) ,
std ::cmp ::Ordering ::Greater ,
) ,
(
AccountsPackageType ::Snapshot ( SnapshotType ::FullSnapshot ) ,
AccountsPackageType ::AccountsHashVerifier ,
std ::cmp ::Ordering ::Greater ,
) ,
(
AccountsPackageType ::Snapshot ( SnapshotType ::IncrementalSnapshot ( 5 ) ) ,
AccountsPackageType ::EpochAccountsHash ,
std ::cmp ::Ordering ::Less ,
) ,
(
AccountsPackageType ::Snapshot ( SnapshotType ::IncrementalSnapshot ( 5 ) ) ,
AccountsPackageType ::Snapshot ( SnapshotType ::FullSnapshot ) ,
std ::cmp ::Ordering ::Less ,
) ,
(
AccountsPackageType ::Snapshot ( SnapshotType ::IncrementalSnapshot ( 5 ) ) ,
AccountsPackageType ::Snapshot ( SnapshotType ::IncrementalSnapshot ( 6 ) ) ,
std ::cmp ::Ordering ::Less ,
) ,
(
AccountsPackageType ::Snapshot ( SnapshotType ::IncrementalSnapshot ( 5 ) ) ,
AccountsPackageType ::Snapshot ( SnapshotType ::IncrementalSnapshot ( 5 ) ) ,
std ::cmp ::Ordering ::Equal ,
) ,
(
AccountsPackageType ::Snapshot ( SnapshotType ::IncrementalSnapshot ( 5 ) ) ,
AccountsPackageType ::Snapshot ( SnapshotType ::IncrementalSnapshot ( 4 ) ) ,
std ::cmp ::Ordering ::Greater ,
) ,
(
AccountsPackageType ::Snapshot ( SnapshotType ::IncrementalSnapshot ( 5 ) ) ,
AccountsPackageType ::AccountsHashVerifier ,
std ::cmp ::Ordering ::Greater ,
) ,
(
AccountsPackageType ::AccountsHashVerifier ,
AccountsPackageType ::AccountsHashVerifier ,
std ::cmp ::Ordering ::Equal ,
) ,
] {
let snapshot_request_a = SnapshotRequest {
snapshot_root_bank : Arc ::clone ( & bank ) ,
status_cache_slot_deltas : Vec ::default ( ) ,
request_type : new_snapshot_request_type ( & accounts_package_type_a ) ,
} ;
let snapshot_request_b = SnapshotRequest {
snapshot_root_bank : Arc ::clone ( & bank ) ,
status_cache_slot_deltas : Vec ::default ( ) ,
request_type : new_snapshot_request_type ( & accounts_package_type_b ) ,
} ;
let request_a = & ( snapshot_request_a , accounts_package_type_a ) ;
let request_b = & ( snapshot_request_b , accounts_package_type_b ) ;
let actual_result = cmp_snapshot_requests ( request_a , request_b ) ;
assert_eq! ( expected_result , actual_result ) ;
}
}
#[ test ]
#[ should_panic ]
fn test_cmp_snapshot_requests_both_eah ( ) {
let genesis_config_info = create_genesis_config ( 10 ) ;
let bank = Arc ::new ( Bank ::new_for_tests ( & genesis_config_info . genesis_config ) ) ;
let accounts_package_type_a = AccountsPackageType ::EpochAccountsHash ;
let accounts_package_type_b = AccountsPackageType ::EpochAccountsHash ;
let snapshot_request_a = SnapshotRequest {
snapshot_root_bank : Arc ::clone ( & bank ) ,
status_cache_slot_deltas : Vec ::default ( ) ,
request_type : new_snapshot_request_type ( & accounts_package_type_a ) ,
} ;
let snapshot_request_b = SnapshotRequest {
snapshot_root_bank : Arc ::clone ( & bank ) ,
status_cache_slot_deltas : Vec ::default ( ) ,
request_type : new_snapshot_request_type ( & accounts_package_type_b ) ,
} ;
let request_a = & ( snapshot_request_a , accounts_package_type_a ) ;
let request_b = & ( snapshot_request_b , accounts_package_type_b ) ;
let _ = cmp_snapshot_requests ( request_a , request_b ) ;
}
fn new_snapshot_request_type (
accounts_package_type : & AccountsPackageType ,
) -> SnapshotRequestType {
match accounts_package_type {
AccountsPackageType ::AccountsHashVerifier = > SnapshotRequestType ::Snapshot ,
AccountsPackageType ::Snapshot ( _ ) = > SnapshotRequestType ::Snapshot ,
AccountsPackageType ::EpochAccountsHash = > SnapshotRequestType ::EpochAccountsHash ,
}
}
2020-09-28 16:04:46 -07:00
}