Optimize bank_forks critical section (#7477)

This commit is contained in:
sakridge 2019-12-13 17:20:31 -08:00 committed by GitHub
parent ecdea54203
commit 98b80288ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 23 additions and 11 deletions

View File

@ -236,7 +236,7 @@ impl ReplayStage {
let start = allocated.get();
Self::generate_new_bank_forks(
&blocktree,
&mut bank_forks.write().unwrap(),
&bank_forks,
&leader_schedule_cache,
&subscriptions,
);
@ -1104,11 +1104,12 @@ impl ReplayStage {
fn generate_new_bank_forks(
blocktree: &Blocktree,
forks: &mut BankForks,
forks_lock: &RwLock<BankForks>,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
subscriptions: &Arc<RpcSubscriptions>,
) {
// Find the next slot that chains to the old slot
let forks = forks_lock.read().unwrap();
let frozen_banks = forks.frozen_banks();
let frozen_bank_slots: Vec<u64> = frozen_banks.keys().cloned().collect();
let next_slots = blocktree
@ -1120,13 +1121,14 @@ impl ReplayStage {
next_slots.sort();
next_slots
});
let mut new_banks = HashMap::new();
for (parent_slot, children) in next_slots {
let parent_bank = frozen_banks
.get(&parent_slot)
.expect("missing parent in bank forks")
.clone();
for child_slot in children {
if forks.get(child_slot).is_some() {
if forks.get(child_slot).is_some() || new_banks.get(&child_slot).is_some() {
trace!("child already active or frozen {}", child_slot);
continue;
}
@ -1140,9 +1142,18 @@ impl ReplayStage {
forks.root()
);
subscriptions.notify_slot(child_slot, parent_slot, forks.root());
forks.insert(Bank::new_from_parent(&parent_bank, &leader, child_slot));
new_banks.insert(
child_slot,
Bank::new_from_parent(&parent_bank, &leader, child_slot),
);
}
}
drop(forks);
let mut forks = forks_lock.write().unwrap();
for (_, bank) in new_banks {
forks.insert(bank);
}
}
pub fn join(self) -> thread::Result<()> {
@ -1482,33 +1493,34 @@ pub(crate) mod tests {
let bank0 = Bank::new(&genesis_config);
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank0));
let subscriptions = Arc::new(RpcSubscriptions::default());
let mut bank_forks = BankForks::new(0, bank0);
let bank_forks = BankForks::new(0, bank0);
bank_forks.working_bank().freeze();
// Insert shred for slot 1, generate new forks, check result
let (shreds, _) = make_slot_entries(1, 0, 8);
blocktree.insert_shreds(shreds, None, false).unwrap();
assert!(bank_forks.get(1).is_none());
let bank_forks = RwLock::new(bank_forks);
ReplayStage::generate_new_bank_forks(
&blocktree,
&mut bank_forks,
&bank_forks,
&leader_schedule_cache,
&subscriptions,
);
assert!(bank_forks.get(1).is_some());
assert!(bank_forks.read().unwrap().get(1).is_some());
// Insert shred for slot 3, generate new forks, check result
let (shreds, _) = make_slot_entries(2, 0, 8);
blocktree.insert_shreds(shreds, None, false).unwrap();
assert!(bank_forks.get(2).is_none());
assert!(bank_forks.read().unwrap().get(2).is_none());
ReplayStage::generate_new_bank_forks(
&blocktree,
&mut bank_forks,
&bank_forks,
&leader_schedule_cache,
&subscriptions,
);
assert!(bank_forks.get(1).is_some());
assert!(bank_forks.get(2).is_some());
assert!(bank_forks.read().unwrap().get(1).is_some());
assert!(bank_forks.read().unwrap().get(2).is_some());
}
let _ignored = remove_dir_all(&ledger_path);