use exit signal for acct idx bg threads (#27483)

This commit is contained in:
Jeff Washington (jwash) 2022-09-12 11:51:12 -07:00 committed by GitHub
parent f0770c199e
commit 765c628546
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 140 additions and 41 deletions

View File

@ -1463,6 +1463,7 @@ fn load_blockstore(
.cache_block_meta_sender .cache_block_meta_sender
.as_ref(), .as_ref(),
accounts_update_notifier, accounts_update_notifier,
exit,
); );
// Before replay starts, set the callbacks in each of the banks in BankForks so that // Before replay starts, set the callbacks in each of the banks in BankForks so that

View File

@ -176,6 +176,7 @@ fn restore_from_snapshot(
false, false,
Some(ACCOUNTS_DB_CONFIG_FOR_TESTING), Some(ACCOUNTS_DB_CONFIG_FOR_TESTING),
None, None,
&Arc::default(),
) )
.unwrap(); .unwrap();
@ -849,6 +850,7 @@ fn restore_from_snapshots_and_check_banks_are_equal(
false, false,
Some(ACCOUNTS_DB_CONFIG_FOR_TESTING), Some(ACCOUNTS_DB_CONFIG_FOR_TESTING),
None, None,
&Arc::default(),
)?; )?;
assert_eq!(bank, &deserialized_bank); assert_eq!(bank, &deserialized_bank);
@ -1044,6 +1046,7 @@ fn test_snapshots_with_background_services(
false, false,
Some(ACCOUNTS_DB_CONFIG_FOR_TESTING), Some(ACCOUNTS_DB_CONFIG_FOR_TESTING),
None, None,
&Arc::default(),
) )
.unwrap(); .unwrap();

View File

@ -1093,6 +1093,7 @@ fn load_bank_forks(
&process_options, &process_options,
None, None,
accounts_update_notifier, accounts_update_notifier,
&Arc::default(),
); );
let (snapshot_request_sender, snapshot_request_receiver) = crossbeam_channel::unbounded(); let (snapshot_request_sender, snapshot_request_receiver) = crossbeam_channel::unbounded();

View File

@ -22,7 +22,7 @@ use {
fs, fs,
path::PathBuf, path::PathBuf,
process, result, process, result,
sync::{Arc, RwLock}, sync::{atomic::AtomicBool, Arc, RwLock},
}, },
}; };
@ -50,6 +50,7 @@ pub fn load(
transaction_status_sender: Option<&TransactionStatusSender>, transaction_status_sender: Option<&TransactionStatusSender>,
cache_block_meta_sender: Option<&CacheBlockMetaSender>, cache_block_meta_sender: Option<&CacheBlockMetaSender>,
accounts_update_notifier: Option<AccountsUpdateNotifier>, accounts_update_notifier: Option<AccountsUpdateNotifier>,
exit: &Arc<AtomicBool>,
) -> LoadResult { ) -> LoadResult {
let (bank_forks, leader_schedule_cache, starting_snapshot_hashes, ..) = load_bank_forks( let (bank_forks, leader_schedule_cache, starting_snapshot_hashes, ..) = load_bank_forks(
genesis_config, genesis_config,
@ -60,6 +61,7 @@ pub fn load(
&process_options, &process_options,
cache_block_meta_sender, cache_block_meta_sender,
accounts_update_notifier, accounts_update_notifier,
exit,
); );
blockstore_processor::process_blockstore_from_root( blockstore_processor::process_blockstore_from_root(
@ -84,6 +86,7 @@ pub fn load_bank_forks(
process_options: &ProcessOptions, process_options: &ProcessOptions,
cache_block_meta_sender: Option<&CacheBlockMetaSender>, cache_block_meta_sender: Option<&CacheBlockMetaSender>,
accounts_update_notifier: Option<AccountsUpdateNotifier>, accounts_update_notifier: Option<AccountsUpdateNotifier>,
exit: &Arc<AtomicBool>,
) -> ( ) -> (
Arc<RwLock<BankForks>>, Arc<RwLock<BankForks>>,
LeaderScheduleCache, LeaderScheduleCache,
@ -124,6 +127,7 @@ pub fn load_bank_forks(
snapshot_config.as_ref().unwrap(), snapshot_config.as_ref().unwrap(),
process_options, process_options,
accounts_update_notifier, accounts_update_notifier,
exit,
) )
} else { } else {
let maybe_filler_accounts = process_options let maybe_filler_accounts = process_options
@ -143,6 +147,7 @@ pub fn load_bank_forks(
process_options, process_options,
cache_block_meta_sender, cache_block_meta_sender,
accounts_update_notifier, accounts_update_notifier,
exit,
); );
bank_forks bank_forks
.read() .read()
@ -186,6 +191,7 @@ fn bank_forks_from_snapshot(
snapshot_config: &SnapshotConfig, snapshot_config: &SnapshotConfig,
process_options: &ProcessOptions, process_options: &ProcessOptions,
accounts_update_notifier: Option<AccountsUpdateNotifier>, accounts_update_notifier: Option<AccountsUpdateNotifier>,
exit: &Arc<AtomicBool>,
) -> (Arc<RwLock<BankForks>>, Option<StartingSnapshotHashes>) { ) -> (Arc<RwLock<BankForks>>, Option<StartingSnapshotHashes>) {
// Fail hard here if snapshot fails to load, don't silently continue // Fail hard here if snapshot fails to load, don't silently continue
if account_paths.is_empty() { if account_paths.is_empty() {
@ -214,6 +220,7 @@ fn bank_forks_from_snapshot(
process_options.verify_index, process_options.verify_index,
process_options.accounts_db_config.clone(), process_options.accounts_db_config.clone(),
accounts_update_notifier, accounts_update_notifier,
exit,
) )
.expect("Load from snapshot failed"); .expect("Load from snapshot failed");

View File

@ -56,7 +56,7 @@ use {
collections::{HashMap, HashSet}, collections::{HashMap, HashSet},
path::PathBuf, path::PathBuf,
result, result,
sync::{Arc, Mutex, RwLock}, sync::{atomic::AtomicBool, Arc, Mutex, RwLock},
time::{Duration, Instant}, time::{Duration, Instant},
}, },
thiserror::Error, thiserror::Error,
@ -722,6 +722,7 @@ pub fn test_process_blockstore(
genesis_config: &GenesisConfig, genesis_config: &GenesisConfig,
blockstore: &Blockstore, blockstore: &Blockstore,
opts: &ProcessOptions, opts: &ProcessOptions,
exit: &Arc<AtomicBool>,
) -> (Arc<RwLock<BankForks>>, LeaderScheduleCache) { ) -> (Arc<RwLock<BankForks>>, LeaderScheduleCache) {
let (bank_forks, leader_schedule_cache, ..) = crate::bank_forks_utils::load_bank_forks( let (bank_forks, leader_schedule_cache, ..) = crate::bank_forks_utils::load_bank_forks(
genesis_config, genesis_config,
@ -732,6 +733,7 @@ pub fn test_process_blockstore(
opts, opts,
None, None,
None, None,
exit,
); );
process_blockstore_from_root( process_blockstore_from_root(
blockstore, blockstore,
@ -753,6 +755,7 @@ pub(crate) fn process_blockstore_for_bank_0(
opts: &ProcessOptions, opts: &ProcessOptions,
cache_block_meta_sender: Option<&CacheBlockMetaSender>, cache_block_meta_sender: Option<&CacheBlockMetaSender>,
accounts_update_notifier: Option<AccountsUpdateNotifier>, accounts_update_notifier: Option<AccountsUpdateNotifier>,
exit: &Arc<AtomicBool>,
) -> Arc<RwLock<BankForks>> { ) -> Arc<RwLock<BankForks>> {
// Setup bank for slot 0 // Setup bank for slot 0
let bank0 = Bank::new_with_paths( let bank0 = Bank::new_with_paths(
@ -767,6 +770,7 @@ pub(crate) fn process_blockstore_for_bank_0(
false, false,
opts.accounts_db_config.clone(), opts.accounts_db_config.clone(),
accounts_update_notifier, accounts_update_notifier,
exit,
); );
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank0))); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank0)));
@ -1842,7 +1846,7 @@ pub mod tests {
AccessType::Primary | AccessType::PrimaryForMaintenance => { AccessType::Primary | AccessType::PrimaryForMaintenance => {
// Attempting to open a second Primary access would fail, so // Attempting to open a second Primary access would fail, so
// just pass the original session if it is a Primary variant // just pass the original session if it is a Primary variant
test_process_blockstore(genesis_config, blockstore, opts) test_process_blockstore(genesis_config, blockstore, opts, &Arc::default())
} }
AccessType::Secondary => { AccessType::Secondary => {
let secondary_blockstore = Blockstore::open_with_options( let secondary_blockstore = Blockstore::open_with_options(
@ -1853,7 +1857,12 @@ pub mod tests {
}, },
) )
.expect("Unable to open access to blockstore"); .expect("Unable to open access to blockstore");
test_process_blockstore(genesis_config, &secondary_blockstore, opts) test_process_blockstore(
genesis_config,
&secondary_blockstore,
opts,
&Arc::default(),
)
} }
} }
} }
@ -1962,6 +1971,7 @@ pub mod tests {
poh_verify: true, poh_verify: true,
..ProcessOptions::default() ..ProcessOptions::default()
}, },
&Arc::default(),
); );
assert_eq!(frozen_bank_slots(&bank_forks.read().unwrap()), vec![0]); assert_eq!(frozen_bank_slots(&bank_forks.read().unwrap()), vec![0]);
@ -1976,6 +1986,7 @@ pub mod tests {
poh_verify: true, poh_verify: true,
..ProcessOptions::default() ..ProcessOptions::default()
}, },
&Arc::default(),
); );
// One valid fork, one bad fork. process_blockstore() should only return the valid fork // One valid fork, one bad fork. process_blockstore() should only return the valid fork
@ -2030,7 +2041,8 @@ pub mod tests {
accounts_db_test_hash_calculation: true, accounts_db_test_hash_calculation: true,
..ProcessOptions::default() ..ProcessOptions::default()
}; };
let (bank_forks, ..) = test_process_blockstore(&genesis_config, &blockstore, &opts); let (bank_forks, ..) =
test_process_blockstore(&genesis_config, &blockstore, &opts, &Arc::default());
assert_eq!(frozen_bank_slots(&bank_forks.read().unwrap()), vec![0]); assert_eq!(frozen_bank_slots(&bank_forks.read().unwrap()), vec![0]);
} }
@ -2094,7 +2106,8 @@ pub mod tests {
accounts_db_test_hash_calculation: true, accounts_db_test_hash_calculation: true,
..ProcessOptions::default() ..ProcessOptions::default()
}; };
let (bank_forks, ..) = test_process_blockstore(&genesis_config, &blockstore, &opts); let (bank_forks, ..) =
test_process_blockstore(&genesis_config, &blockstore, &opts, &Arc::default());
assert_eq!(frozen_bank_slots(&bank_forks.read().unwrap()), vec![0]); // slot 1 isn't "full", we stop at slot zero assert_eq!(frozen_bank_slots(&bank_forks.read().unwrap()), vec![0]); // slot 1 isn't "full", we stop at slot zero
@ -2113,7 +2126,8 @@ pub mod tests {
}; };
fill_blockstore_slot_with_ticks(&blockstore, ticks_per_slot, 3, 0, blockhash); fill_blockstore_slot_with_ticks(&blockstore, ticks_per_slot, 3, 0, blockhash);
// Slot 0 should not show up in the ending bank_forks_info // Slot 0 should not show up in the ending bank_forks_info
let (bank_forks, ..) = test_process_blockstore(&genesis_config, &blockstore, &opts); let (bank_forks, ..) =
test_process_blockstore(&genesis_config, &blockstore, &opts, &Arc::default());
// slot 1 isn't "full", we stop at slot zero // slot 1 isn't "full", we stop at slot zero
assert_eq!(frozen_bank_slots(&bank_forks.read().unwrap()), vec![0, 3]); assert_eq!(frozen_bank_slots(&bank_forks.read().unwrap()), vec![0, 3]);
@ -2179,7 +2193,8 @@ pub mod tests {
accounts_db_test_hash_calculation: true, accounts_db_test_hash_calculation: true,
..ProcessOptions::default() ..ProcessOptions::default()
}; };
let (bank_forks, ..) = test_process_blockstore(&genesis_config, &blockstore, &opts); let (bank_forks, ..) =
test_process_blockstore(&genesis_config, &blockstore, &opts, &Arc::default());
let bank_forks = bank_forks.read().unwrap(); let bank_forks = bank_forks.read().unwrap();
// One fork, other one is ignored b/c not a descendant of the root // One fork, other one is ignored b/c not a descendant of the root
@ -2258,7 +2273,8 @@ pub mod tests {
accounts_db_test_hash_calculation: true, accounts_db_test_hash_calculation: true,
..ProcessOptions::default() ..ProcessOptions::default()
}; };
let (bank_forks, ..) = test_process_blockstore(&genesis_config, &blockstore, &opts); let (bank_forks, ..) =
test_process_blockstore(&genesis_config, &blockstore, &opts, &Arc::default());
let bank_forks = bank_forks.read().unwrap(); let bank_forks = bank_forks.read().unwrap();
assert_eq!(frozen_bank_slots(&bank_forks), vec![1, 2, 3, 4]); assert_eq!(frozen_bank_slots(&bank_forks), vec![1, 2, 3, 4]);
@ -2314,8 +2330,12 @@ pub mod tests {
blockstore.set_dead_slot(2).unwrap(); blockstore.set_dead_slot(2).unwrap();
fill_blockstore_slot_with_ticks(&blockstore, ticks_per_slot, 3, 1, slot1_blockhash); fill_blockstore_slot_with_ticks(&blockstore, ticks_per_slot, 3, 1, slot1_blockhash);
let (bank_forks, ..) = let (bank_forks, ..) = test_process_blockstore(
test_process_blockstore(&genesis_config, &blockstore, &ProcessOptions::default()); &genesis_config,
&blockstore,
&ProcessOptions::default(),
&Arc::default(),
);
let bank_forks = bank_forks.read().unwrap(); let bank_forks = bank_forks.read().unwrap();
assert_eq!(frozen_bank_slots(&bank_forks), vec![0, 1, 3]); assert_eq!(frozen_bank_slots(&bank_forks), vec![0, 1, 3]);
@ -2359,8 +2379,12 @@ pub mod tests {
blockstore.set_dead_slot(4).unwrap(); blockstore.set_dead_slot(4).unwrap();
fill_blockstore_slot_with_ticks(&blockstore, ticks_per_slot, 3, 1, slot1_blockhash); fill_blockstore_slot_with_ticks(&blockstore, ticks_per_slot, 3, 1, slot1_blockhash);
let (bank_forks, ..) = let (bank_forks, ..) = test_process_blockstore(
test_process_blockstore(&genesis_config, &blockstore, &ProcessOptions::default()); &genesis_config,
&blockstore,
&ProcessOptions::default(),
&Arc::default(),
);
let bank_forks = bank_forks.read().unwrap(); let bank_forks = bank_forks.read().unwrap();
// Should see the parent of the dead child // Should see the parent of the dead child
@ -2407,8 +2431,12 @@ pub mod tests {
fill_blockstore_slot_with_ticks(&blockstore, ticks_per_slot, 2, 0, blockhash); fill_blockstore_slot_with_ticks(&blockstore, ticks_per_slot, 2, 0, blockhash);
blockstore.set_dead_slot(1).unwrap(); blockstore.set_dead_slot(1).unwrap();
blockstore.set_dead_slot(2).unwrap(); blockstore.set_dead_slot(2).unwrap();
let (bank_forks, ..) = let (bank_forks, ..) = test_process_blockstore(
test_process_blockstore(&genesis_config, &blockstore, &ProcessOptions::default()); &genesis_config,
&blockstore,
&ProcessOptions::default(),
&Arc::default(),
);
let bank_forks = bank_forks.read().unwrap(); let bank_forks = bank_forks.read().unwrap();
// Should see only the parent of the dead children // Should see only the parent of the dead children
@ -2459,7 +2487,8 @@ pub mod tests {
accounts_db_test_hash_calculation: true, accounts_db_test_hash_calculation: true,
..ProcessOptions::default() ..ProcessOptions::default()
}; };
let (bank_forks, ..) = test_process_blockstore(&genesis_config, &blockstore, &opts); let (bank_forks, ..) =
test_process_blockstore(&genesis_config, &blockstore, &opts, &Arc::default());
let bank_forks = bank_forks.read().unwrap(); let bank_forks = bank_forks.read().unwrap();
// There is one fork, head is last_slot + 1 // There is one fork, head is last_slot + 1
@ -2603,7 +2632,8 @@ pub mod tests {
accounts_db_test_hash_calculation: true, accounts_db_test_hash_calculation: true,
..ProcessOptions::default() ..ProcessOptions::default()
}; };
let (bank_forks, ..) = test_process_blockstore(&genesis_config, &blockstore, &opts); let (bank_forks, ..) =
test_process_blockstore(&genesis_config, &blockstore, &opts, &Arc::default());
let bank_forks = bank_forks.read().unwrap(); let bank_forks = bank_forks.read().unwrap();
assert_eq!(frozen_bank_slots(&bank_forks), vec![0, 1]); assert_eq!(frozen_bank_slots(&bank_forks), vec![0, 1]);
@ -2633,7 +2663,8 @@ pub mod tests {
accounts_db_test_hash_calculation: true, accounts_db_test_hash_calculation: true,
..ProcessOptions::default() ..ProcessOptions::default()
}; };
let (bank_forks, ..) = test_process_blockstore(&genesis_config, &blockstore, &opts); let (bank_forks, ..) =
test_process_blockstore(&genesis_config, &blockstore, &opts, &Arc::default());
let bank_forks = bank_forks.read().unwrap(); let bank_forks = bank_forks.read().unwrap();
assert_eq!(frozen_bank_slots(&bank_forks), vec![0]); assert_eq!(frozen_bank_slots(&bank_forks), vec![0]);
@ -2653,7 +2684,7 @@ pub mod tests {
..ProcessOptions::default() ..ProcessOptions::default()
}; };
let (_bank_forks, leader_schedule) = let (_bank_forks, leader_schedule) =
test_process_blockstore(&genesis_config, &blockstore, &opts); test_process_blockstore(&genesis_config, &blockstore, &opts, &Arc::default());
assert_eq!(leader_schedule.max_schedules(), std::usize::MAX); assert_eq!(leader_schedule.max_schedules(), std::usize::MAX);
} }
@ -2712,7 +2743,7 @@ pub mod tests {
accounts_db_test_hash_calculation: true, accounts_db_test_hash_calculation: true,
..ProcessOptions::default() ..ProcessOptions::default()
}; };
test_process_blockstore(&genesis_config, &blockstore, &opts); test_process_blockstore(&genesis_config, &blockstore, &opts, &Arc::default());
assert_eq!(*callback_counter.write().unwrap(), 2); assert_eq!(*callback_counter.write().unwrap(), 2);
} }
@ -3366,7 +3397,8 @@ pub mod tests {
accounts_db_test_hash_calculation: true, accounts_db_test_hash_calculation: true,
..ProcessOptions::default() ..ProcessOptions::default()
}; };
let (bank_forks, ..) = test_process_blockstore(&genesis_config, &blockstore, &opts); let (bank_forks, ..) =
test_process_blockstore(&genesis_config, &blockstore, &opts, &Arc::default());
let bank_forks = bank_forks.read().unwrap(); let bank_forks = bank_forks.read().unwrap();
// Should be able to fetch slot 0 because we specified halting at slot 0, even // Should be able to fetch slot 0 because we specified halting at slot 0, even

View File

@ -2180,6 +2180,7 @@ fn create_snapshot_to_hard_fork(
None, None,
None, None,
None, None,
&Arc::default(),
) )
.unwrap(); .unwrap();
let bank = bank_forks.read().unwrap().get(snapshot_slot).unwrap(); let bank = bank_forks.read().unwrap().get(snapshot_slot).unwrap();

View File

@ -12,6 +12,7 @@ use {
}, },
}, },
solana_sdk::{account::AccountSharedData, pubkey}, solana_sdk::{account::AccountSharedData, pubkey},
std::sync::Arc,
test::Bencher, test::Bencher,
}; };
@ -23,7 +24,10 @@ fn bench_accounts_index(bencher: &mut Bencher) {
const NUM_FORKS: u64 = 16; const NUM_FORKS: u64 = 16;
let mut reclaims = vec![]; let mut reclaims = vec![];
let index = AccountsIndex::<AccountInfo>::new(Some(ACCOUNTS_INDEX_CONFIG_FOR_BENCHMARKS)); let index = AccountsIndex::<AccountInfo>::new(
Some(ACCOUNTS_INDEX_CONFIG_FOR_BENCHMARKS),
&Arc::default(),
);
for f in 0..NUM_FORKS { for f in 0..NUM_FORKS {
for pubkey in pubkeys.iter().take(NUM_PUBKEYS) { for pubkey in pubkeys.iter().take(NUM_PUBKEYS) {
index.upsert( index.upsert(

View File

@ -64,7 +64,7 @@ use {
ops::RangeBounds, ops::RangeBounds,
path::PathBuf, path::PathBuf,
sync::{ sync::{
atomic::{AtomicUsize, Ordering}, atomic::{AtomicBool, AtomicUsize, Ordering},
Arc, Mutex, Arc, Mutex,
}, },
}, },
@ -167,6 +167,7 @@ impl Accounts {
shrink_ratio, shrink_ratio,
Some(ACCOUNTS_DB_CONFIG_FOR_TESTING), Some(ACCOUNTS_DB_CONFIG_FOR_TESTING),
None, None,
&Arc::default(),
) )
} }
@ -185,6 +186,7 @@ impl Accounts {
shrink_ratio, shrink_ratio,
Some(ACCOUNTS_DB_CONFIG_FOR_BENCHMARKS), Some(ACCOUNTS_DB_CONFIG_FOR_BENCHMARKS),
None, None,
&Arc::default(),
) )
} }
@ -196,6 +198,7 @@ impl Accounts {
shrink_ratio: AccountShrinkThreshold, shrink_ratio: AccountShrinkThreshold,
accounts_db_config: Option<AccountsDbConfig>, accounts_db_config: Option<AccountsDbConfig>,
accounts_update_notifier: Option<AccountsUpdateNotifier>, accounts_update_notifier: Option<AccountsUpdateNotifier>,
exit: &Arc<AtomicBool>,
) -> Self { ) -> Self {
Self { Self {
accounts_db: Arc::new(AccountsDb::new_with_config( accounts_db: Arc::new(AccountsDb::new_with_config(
@ -206,6 +209,7 @@ impl Accounts {
shrink_ratio, shrink_ratio,
accounts_db_config, accounts_db_config,
accounts_update_notifier, accounts_update_notifier,
exit,
)), )),
account_locks: Mutex::new(AccountLocks::default()), account_locks: Mutex::new(AccountLocks::default()),
} }

View File

@ -2014,6 +2014,7 @@ impl AccountsDb {
AccountShrinkThreshold::default(), AccountShrinkThreshold::default(),
Some(ACCOUNTS_DB_CONFIG_FOR_TESTING), Some(ACCOUNTS_DB_CONFIG_FOR_TESTING),
None, None,
&Arc::default(),
) )
} }
@ -2026,6 +2027,7 @@ impl AccountsDb {
AccountShrinkThreshold::default(), AccountShrinkThreshold::default(),
Some(ACCOUNTS_DB_CONFIG_FOR_TESTING), Some(ACCOUNTS_DB_CONFIG_FOR_TESTING),
None, None,
&Arc::default(),
) )
} }
@ -2037,9 +2039,12 @@ impl AccountsDb {
shrink_ratio: AccountShrinkThreshold, shrink_ratio: AccountShrinkThreshold,
mut accounts_db_config: Option<AccountsDbConfig>, mut accounts_db_config: Option<AccountsDbConfig>,
accounts_update_notifier: Option<AccountsUpdateNotifier>, accounts_update_notifier: Option<AccountsUpdateNotifier>,
exit: &Arc<AtomicBool>,
) -> Self { ) -> Self {
let accounts_index = let accounts_index = AccountsIndex::new(
AccountsIndex::new(accounts_db_config.as_mut().and_then(|x| x.index.take())); accounts_db_config.as_mut().and_then(|x| x.index.take()),
exit,
);
let accounts_hash_cache_path = accounts_db_config let accounts_hash_cache_path = accounts_db_config
.as_ref() .as_ref()
.and_then(|x| x.accounts_hash_cache_path.clone()); .and_then(|x| x.accounts_hash_cache_path.clone());
@ -9194,6 +9199,7 @@ impl AccountsDb {
shrink_ratio, shrink_ratio,
Some(ACCOUNTS_DB_CONFIG_FOR_TESTING), Some(ACCOUNTS_DB_CONFIG_FOR_TESTING),
None, None,
&Arc::default(),
) )
} }

View File

@ -702,14 +702,14 @@ pub struct AccountsIndex<T: IndexValue> {
impl<T: IndexValue> AccountsIndex<T> { impl<T: IndexValue> AccountsIndex<T> {
pub fn default_for_tests() -> Self { pub fn default_for_tests() -> Self {
Self::new(Some(ACCOUNTS_INDEX_CONFIG_FOR_TESTING)) Self::new(Some(ACCOUNTS_INDEX_CONFIG_FOR_TESTING), &Arc::default())
} }
pub fn new(config: Option<AccountsIndexConfig>) -> Self { pub fn new(config: Option<AccountsIndexConfig>, exit: &Arc<AtomicBool>) -> Self {
let scan_results_limit_bytes = config let scan_results_limit_bytes = config
.as_ref() .as_ref()
.and_then(|config| config.scan_results_limit_bytes); .and_then(|config| config.scan_results_limit_bytes);
let (account_maps, bin_calculator, storage) = Self::allocate_accounts_index(config); let (account_maps, bin_calculator, storage) = Self::allocate_accounts_index(config, exit);
Self { Self {
account_maps, account_maps,
bin_calculator, bin_calculator,
@ -737,6 +737,7 @@ impl<T: IndexValue> AccountsIndex<T> {
fn allocate_accounts_index( fn allocate_accounts_index(
config: Option<AccountsIndexConfig>, config: Option<AccountsIndexConfig>,
exit: &Arc<AtomicBool>,
) -> ( ) -> (
LockMapType<T>, LockMapType<T>,
PubkeyBinCalculator24, PubkeyBinCalculator24,
@ -748,7 +749,7 @@ impl<T: IndexValue> AccountsIndex<T> {
.unwrap_or(BINS_DEFAULT); .unwrap_or(BINS_DEFAULT);
// create bin_calculator early to verify # bins is reasonable // create bin_calculator early to verify # bins is reasonable
let bin_calculator = PubkeyBinCalculator24::new(bins); let bin_calculator = PubkeyBinCalculator24::new(bins);
let storage = AccountsIndexStorage::new(bins, &config); let storage = AccountsIndexStorage::new(bins, &config, exit);
let account_maps = (0..bins) let account_maps = (0..bins)
.into_iter() .into_iter()
.map(|bin| Arc::clone(&storage.in_mem[bin])) .map(|bin| Arc::clone(&storage.in_mem[bin]))
@ -2570,7 +2571,7 @@ pub mod tests {
} else { } else {
IndexLimitMb::InMemOnly // in-mem only IndexLimitMb::InMemOnly // in-mem only
}; };
let index = AccountsIndex::<T>::new(Some(config)); let index = AccountsIndex::<T>::new(Some(config), &Arc::default());
let mut gc = Vec::new(); let mut gc = Vec::new();
if upsert { if upsert {
@ -4226,7 +4227,7 @@ pub mod tests {
fn test_illegal_bins() { fn test_illegal_bins() {
let mut config = AccountsIndexConfig::default(); let mut config = AccountsIndexConfig::default();
config.bins = Some(3); config.bins = Some(3);
AccountsIndex::<bool>::new(Some(config)); AccountsIndex::<bool>::new(Some(config), &Arc::default());
} }
#[test] #[test]

View File

@ -21,6 +21,7 @@ pub struct AccountsIndexStorage<T: IndexValue> {
pub storage: Arc<BucketMapHolder<T>>, pub storage: Arc<BucketMapHolder<T>>,
pub in_mem: Vec<Arc<InMemAccountsIndex<T>>>, pub in_mem: Vec<Arc<InMemAccountsIndex<T>>>,
exit: Arc<AtomicBool>,
/// set_startup(true) creates bg threads which are kept alive until set_startup(false) /// set_startup(true) creates bg threads which are kept alive until set_startup(false)
startup_worker_threads: Mutex<Option<BgThreads>>, startup_worker_threads: Mutex<Option<BgThreads>>,
@ -57,9 +58,10 @@ impl BgThreads {
in_mem: &[Arc<InMemAccountsIndex<T>>], in_mem: &[Arc<InMemAccountsIndex<T>>],
threads: usize, threads: usize,
can_advance_age: bool, can_advance_age: bool,
exit: &Arc<AtomicBool>,
) -> Self { ) -> Self {
// stop signal used for THIS batch of bg threads // stop signal used for THIS batch of bg threads
let exit = Arc::new(AtomicBool::default()); let local_exit = Arc::new(AtomicBool::default());
let handles = Some( let handles = Some(
(0..threads) (0..threads)
.into_iter() .into_iter()
@ -67,14 +69,19 @@ impl BgThreads {
// the first thread we start is special // the first thread we start is special
let can_advance_age = can_advance_age && idx == 0; let can_advance_age = can_advance_age && idx == 0;
let storage_ = Arc::clone(storage); let storage_ = Arc::clone(storage);
let exit_ = Arc::clone(&exit); let local_exit_ = Arc::clone(&local_exit);
let system_exit_ = Arc::clone(exit);
let in_mem_ = in_mem.to_vec(); let in_mem_ = in_mem.to_vec();
// note that using rayon here causes us to exhaust # rayon threads and many tests running in parallel deadlock // note that using rayon here causes us to exhaust # rayon threads and many tests running in parallel deadlock
Builder::new() Builder::new()
.name(format!("solIdxFlusher{:02}", idx)) .name(format!("solIdxFlusher{:02}", idx))
.spawn(move || { .spawn(move || {
storage_.background(exit_, in_mem_, can_advance_age); storage_.background(
vec![local_exit_, system_exit_],
in_mem_,
can_advance_age,
);
}) })
.unwrap() .unwrap()
}) })
@ -82,7 +89,7 @@ impl BgThreads {
); );
BgThreads { BgThreads {
exit, exit: local_exit,
handles, handles,
wait: Arc::clone(&storage.wait_dirty_or_aged), wait: Arc::clone(&storage.wait_dirty_or_aged),
} }
@ -117,6 +124,7 @@ impl<T: IndexValue> AccountsIndexStorage<T> {
&self.in_mem, &self.in_mem,
Self::num_threads(), Self::num_threads(),
false, // cannot advance age from any of these threads false, // cannot advance age from any of these threads
&self.exit,
)); ));
} }
self.storage.set_startup(value); self.storage.set_startup(value);
@ -147,7 +155,7 @@ impl<T: IndexValue> AccountsIndexStorage<T> {
} }
/// allocate BucketMapHolder and InMemAccountsIndex[] /// allocate BucketMapHolder and InMemAccountsIndex[]
pub fn new(bins: usize, config: &Option<AccountsIndexConfig>) -> Self { pub fn new(bins: usize, config: &Option<AccountsIndexConfig>, exit: &Arc<AtomicBool>) -> Self {
let threads = config let threads = config
.as_ref() .as_ref()
.and_then(|config| config.flush_threads) .and_then(|config| config.flush_threads)
@ -161,10 +169,11 @@ impl<T: IndexValue> AccountsIndexStorage<T> {
.collect::<Vec<_>>(); .collect::<Vec<_>>();
Self { Self {
_bg_threads: BgThreads::new(&storage, &in_mem, threads, true), _bg_threads: BgThreads::new(&storage, &in_mem, threads, true, exit),
storage, storage,
in_mem, in_mem,
startup_worker_threads: Mutex::default(), startup_worker_threads: Mutex::default(),
exit: Arc::clone(exit),
} }
} }
} }

View File

@ -1588,6 +1588,7 @@ impl Bank {
false, false,
Some(ACCOUNTS_DB_CONFIG_FOR_TESTING), Some(ACCOUNTS_DB_CONFIG_FOR_TESTING),
None, None,
&Arc::default(),
) )
} }
@ -1604,6 +1605,7 @@ impl Bank {
false, false,
Some(ACCOUNTS_DB_CONFIG_FOR_BENCHMARKS), Some(ACCOUNTS_DB_CONFIG_FOR_BENCHMARKS),
None, None,
&Arc::default(),
) )
} }
@ -1620,6 +1622,7 @@ impl Bank {
debug_do_not_add_builtins: bool, debug_do_not_add_builtins: bool,
accounts_db_config: Option<AccountsDbConfig>, accounts_db_config: Option<AccountsDbConfig>,
accounts_update_notifier: Option<AccountsUpdateNotifier>, accounts_update_notifier: Option<AccountsUpdateNotifier>,
exit: &Arc<AtomicBool>,
) -> Self { ) -> Self {
let accounts = Accounts::new_with_config( let accounts = Accounts::new_with_config(
paths, paths,
@ -1629,6 +1632,7 @@ impl Bank {
shrink_ratio, shrink_ratio,
accounts_db_config, accounts_db_config,
accounts_update_notifier, accounts_update_notifier,
exit,
); );
let mut bank = Self::default_with_accounts(accounts); let mut bank = Self::default_with_accounts(accounts);
bank.ancestors = Ancestors::from(vec![bank.slot()]); bank.ancestors = Ancestors::from(vec![bank.slot()]);

View File

@ -327,7 +327,7 @@ impl<T: IndexValue> BucketMapHolder<T> {
// intended to execute in a bg thread // intended to execute in a bg thread
pub fn background( pub fn background(
&self, &self,
exit: Arc<AtomicBool>, exit: Vec<Arc<AtomicBool>>,
in_mem: Vec<Arc<InMemAccountsIndex<T>>>, in_mem: Vec<Arc<InMemAccountsIndex<T>>>,
can_advance_age: bool, can_advance_age: bool,
) { ) {
@ -370,7 +370,7 @@ impl<T: IndexValue> BucketMapHolder<T> {
} }
throttling_wait_ms = None; throttling_wait_ms = None;
if exit.load(Ordering::Relaxed) { if exit.iter().any(|exit| exit.load(Ordering::Relaxed)) {
break; break;
} }

View File

@ -40,7 +40,7 @@ use {
path::{Path, PathBuf}, path::{Path, PathBuf},
result::Result, result::Result,
sync::{ sync::{
atomic::{AtomicUsize, Ordering}, atomic::{AtomicBool, AtomicUsize, Ordering},
Arc, Arc,
}, },
thread::Builder, thread::Builder,
@ -317,6 +317,7 @@ pub(crate) fn bank_from_streams<R>(
verify_index: bool, verify_index: bool,
accounts_db_config: Option<AccountsDbConfig>, accounts_db_config: Option<AccountsDbConfig>,
accounts_update_notifier: Option<AccountsUpdateNotifier>, accounts_update_notifier: Option<AccountsUpdateNotifier>,
exit: &Arc<AtomicBool>,
) -> std::result::Result<Bank, Error> ) -> std::result::Result<Bank, Error>
where where
R: Read, R: Read,
@ -338,6 +339,7 @@ where
verify_index, verify_index,
accounts_db_config, accounts_db_config,
accounts_update_notifier, accounts_update_notifier,
exit,
) )
} }
@ -531,6 +533,7 @@ fn reconstruct_bank_from_fields<E>(
verify_index: bool, verify_index: bool,
accounts_db_config: Option<AccountsDbConfig>, accounts_db_config: Option<AccountsDbConfig>,
accounts_update_notifier: Option<AccountsUpdateNotifier>, accounts_update_notifier: Option<AccountsUpdateNotifier>,
exit: &Arc<AtomicBool>,
) -> Result<Bank, Error> ) -> Result<Bank, Error>
where where
E: SerializableStorage + std::marker::Sync, E: SerializableStorage + std::marker::Sync,
@ -547,6 +550,7 @@ where
verify_index, verify_index,
accounts_db_config, accounts_db_config,
accounts_update_notifier, accounts_update_notifier,
exit,
bank_fields.epoch_accounts_hash, bank_fields.epoch_accounts_hash,
)?; )?;
@ -668,6 +672,7 @@ fn reconstruct_accountsdb_from_fields<E>(
verify_index: bool, verify_index: bool,
accounts_db_config: Option<AccountsDbConfig>, accounts_db_config: Option<AccountsDbConfig>,
accounts_update_notifier: Option<AccountsUpdateNotifier>, accounts_update_notifier: Option<AccountsUpdateNotifier>,
exit: &Arc<AtomicBool>,
epoch_accounts_hash: Option<Hash>, epoch_accounts_hash: Option<Hash>,
) -> Result<(AccountsDb, ReconstructedAccountsDbInfo), Error> ) -> Result<(AccountsDb, ReconstructedAccountsDbInfo), Error>
where where
@ -681,6 +686,7 @@ where
shrink_ratio, shrink_ratio,
accounts_db_config, accounts_db_config,
accounts_update_notifier, accounts_update_notifier,
exit,
); );
*accounts_db.epoch_accounts_hash.lock().unwrap() = *accounts_db.epoch_accounts_hash.lock().unwrap() =
epoch_accounts_hash.map(EpochAccountsHash::new); epoch_accounts_hash.map(EpochAccountsHash::new);

View File

@ -116,6 +116,7 @@ where
false, false,
Some(crate::accounts_db::ACCOUNTS_DB_CONFIG_FOR_TESTING), Some(crate::accounts_db::ACCOUNTS_DB_CONFIG_FOR_TESTING),
None, None,
&Arc::default(),
None, None,
) )
.map(|(accounts_db, _)| accounts_db) .map(|(accounts_db, _)| accounts_db)
@ -405,6 +406,7 @@ fn test_bank_serialize_style(
false, false,
Some(crate::accounts_db::ACCOUNTS_DB_CONFIG_FOR_TESTING), Some(crate::accounts_db::ACCOUNTS_DB_CONFIG_FOR_TESTING),
None, None,
&Arc::default(),
) )
.unwrap(); .unwrap();
dbank.status_cache = Arc::new(RwLock::new(status_cache)); dbank.status_cache = Arc::new(RwLock::new(status_cache));
@ -551,6 +553,7 @@ fn test_extra_fields_eof() {
false, false,
Some(crate::accounts_db::ACCOUNTS_DB_CONFIG_FOR_TESTING), Some(crate::accounts_db::ACCOUNTS_DB_CONFIG_FOR_TESTING),
None, None,
&Arc::default(),
) )
.unwrap(); .unwrap();
@ -613,6 +616,7 @@ fn test_extra_fields_full_snapshot_archive() {
false, false,
Some(crate::accounts_db::ACCOUNTS_DB_CONFIG_FOR_TESTING), Some(crate::accounts_db::ACCOUNTS_DB_CONFIG_FOR_TESTING),
None, None,
&Arc::default(),
) )
.unwrap(); .unwrap();
@ -674,6 +678,7 @@ fn test_blank_extra_fields() {
false, false,
Some(crate::accounts_db::ACCOUNTS_DB_CONFIG_FOR_TESTING), Some(crate::accounts_db::ACCOUNTS_DB_CONFIG_FOR_TESTING),
None, None,
&Arc::default(),
) )
.unwrap(); .unwrap();

View File

@ -47,7 +47,10 @@ use {
path::{Path, PathBuf}, path::{Path, PathBuf},
process::ExitStatus, process::ExitStatus,
str::FromStr, str::FromStr,
sync::{atomic::AtomicU32, Arc}, sync::{
atomic::{AtomicBool, AtomicU32},
Arc,
},
}, },
tar::{self, Archive}, tar::{self, Archive},
tempfile::TempDir, tempfile::TempDir,
@ -966,6 +969,7 @@ pub fn bank_from_snapshot_archives(
verify_index: bool, verify_index: bool,
accounts_db_config: Option<AccountsDbConfig>, accounts_db_config: Option<AccountsDbConfig>,
accounts_update_notifier: Option<AccountsUpdateNotifier>, accounts_update_notifier: Option<AccountsUpdateNotifier>,
exit: &Arc<AtomicBool>,
) -> Result<(Bank, BankFromArchiveTimings)> { ) -> Result<(Bank, BankFromArchiveTimings)> {
let (unarchived_full_snapshot, mut unarchived_incremental_snapshot, next_append_vec_id) = let (unarchived_full_snapshot, mut unarchived_incremental_snapshot, next_append_vec_id) =
verify_and_unarchive_snapshots( verify_and_unarchive_snapshots(
@ -1008,6 +1012,7 @@ pub fn bank_from_snapshot_archives(
verify_index, verify_index,
accounts_db_config, accounts_db_config,
accounts_update_notifier, accounts_update_notifier,
exit,
)?; )?;
measure_rebuild.stop(); measure_rebuild.stop();
info!("{}", measure_rebuild); info!("{}", measure_rebuild);
@ -1056,6 +1061,7 @@ pub fn bank_from_latest_snapshot_archives(
verify_index: bool, verify_index: bool,
accounts_db_config: Option<AccountsDbConfig>, accounts_db_config: Option<AccountsDbConfig>,
accounts_update_notifier: Option<AccountsUpdateNotifier>, accounts_update_notifier: Option<AccountsUpdateNotifier>,
exit: &Arc<AtomicBool>,
) -> Result<( ) -> Result<(
Bank, Bank,
FullSnapshotArchiveInfo, FullSnapshotArchiveInfo,
@ -1100,6 +1106,7 @@ pub fn bank_from_latest_snapshot_archives(
verify_index, verify_index,
accounts_db_config, accounts_db_config,
accounts_update_notifier, accounts_update_notifier,
exit,
)?; )?;
datapoint_info!( datapoint_info!(
@ -1802,6 +1809,7 @@ fn rebuild_bank_from_snapshots(
verify_index: bool, verify_index: bool,
accounts_db_config: Option<AccountsDbConfig>, accounts_db_config: Option<AccountsDbConfig>,
accounts_update_notifier: Option<AccountsUpdateNotifier>, accounts_update_notifier: Option<AccountsUpdateNotifier>,
exit: &Arc<AtomicBool>,
) -> Result<Bank> { ) -> Result<Bank> {
let (full_snapshot_version, full_snapshot_root_paths) = let (full_snapshot_version, full_snapshot_root_paths) =
verify_unpacked_snapshots_dir_and_version( verify_unpacked_snapshots_dir_and_version(
@ -1851,6 +1859,7 @@ fn rebuild_bank_from_snapshots(
verify_index, verify_index,
accounts_db_config, accounts_db_config,
accounts_update_notifier, accounts_update_notifier,
exit,
), ),
}?, }?,
) )
@ -3384,6 +3393,7 @@ mod tests {
false, false,
Some(ACCOUNTS_DB_CONFIG_FOR_TESTING), Some(ACCOUNTS_DB_CONFIG_FOR_TESTING),
None, None,
&Arc::default(),
) )
.unwrap(); .unwrap();
@ -3496,6 +3506,7 @@ mod tests {
false, false,
Some(ACCOUNTS_DB_CONFIG_FOR_TESTING), Some(ACCOUNTS_DB_CONFIG_FOR_TESTING),
None, None,
&Arc::default(),
) )
.unwrap(); .unwrap();
@ -3628,6 +3639,7 @@ mod tests {
false, false,
Some(ACCOUNTS_DB_CONFIG_FOR_TESTING), Some(ACCOUNTS_DB_CONFIG_FOR_TESTING),
None, None,
&Arc::default(),
) )
.unwrap(); .unwrap();
@ -3750,6 +3762,7 @@ mod tests {
false, false,
Some(ACCOUNTS_DB_CONFIG_FOR_TESTING), Some(ACCOUNTS_DB_CONFIG_FOR_TESTING),
None, None,
&Arc::default(),
) )
.unwrap(); .unwrap();
@ -3890,6 +3903,7 @@ mod tests {
false, false,
Some(ACCOUNTS_DB_CONFIG_FOR_TESTING), Some(ACCOUNTS_DB_CONFIG_FOR_TESTING),
None, None,
&Arc::default(),
) )
.unwrap(); .unwrap();
assert_eq!( assert_eq!(
@ -3954,6 +3968,7 @@ mod tests {
false, false,
Some(ACCOUNTS_DB_CONFIG_FOR_TESTING), Some(ACCOUNTS_DB_CONFIG_FOR_TESTING),
None, None,
&Arc::default(),
) )
.unwrap(); .unwrap();
assert_eq!( assert_eq!(