Set a global fork graph in program cache (#33776)

* Set a global fork graph in program cache

* fix deadlock

* review feedback
This commit is contained in:
Pankaj Garg 2023-10-20 08:47:03 -07:00 committed by GitHub
parent c98c24bd6d
commit 59cb3b57ee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 145 additions and 83 deletions

View File

@ -3859,6 +3859,7 @@ impl ReplayStage {
epoch_slots_frozen_slots: &mut EpochSlotsFrozenSlots,
drop_bank_sender: &Sender<Vec<Arc<Bank>>>,
) {
bank_forks.read().unwrap().prune_program_cache(new_root);
let removed_banks = bank_forks.write().unwrap().set_root(
new_root,
accounts_background_request_sender,

View File

@ -189,6 +189,15 @@ pub fn load_bank_forks(
(bank_forks, None)
};
bank_forks
.read()
.expect("Failed to read lock the bank forks")
.root_bank()
.loaded_programs_cache
.write()
.expect("Failed to write lock the program cache")
.set_fork_graph(bank_forks.clone());
let mut leader_schedule_cache =
LeaderScheduleCache::new_from_bank(&bank_forks.read().unwrap().root_bank());
if process_options.full_leader_cache {

View File

@ -1555,6 +1555,11 @@ fn load_frozen_forks(
root = new_root_bank.slot();
leader_schedule_cache.set_root(new_root_bank);
new_root_bank
.loaded_programs_cache
.write()
.unwrap()
.prune(root, new_root_bank.epoch());
let _ = bank_forks.write().unwrap().set_root(
root,
accounts_background_request_sender,

View File

@ -4,7 +4,7 @@ use {
timings::ExecuteDetailsTimings,
},
itertools::Itertools,
log::{debug, log_enabled, trace},
log::{debug, error, log_enabled, trace},
percentage::PercentageInteger,
solana_measure::measure::Measure,
solana_rbpf::{
@ -24,7 +24,7 @@ use {
fmt::{Debug, Formatter},
sync::{
atomic::{AtomicU64, Ordering},
Arc,
Arc, RwLock,
},
},
};
@ -442,8 +442,8 @@ impl Default for ProgramRuntimeEnvironments {
}
}
#[derive(Debug, Default)]
pub struct LoadedPrograms {
#[derive(Debug)]
pub struct LoadedPrograms<FG: ForkGraph> {
/// A two level index:
///
/// Pubkey is the address of a program, multiple versions can coexists simultaneously under the same address (in different slots).
@ -455,6 +455,20 @@ pub struct LoadedPrograms {
/// Environments of the current epoch
pub environments: ProgramRuntimeEnvironments,
pub stats: Stats,
fork_graph: Option<Arc<RwLock<FG>>>,
}
impl<FG: ForkGraph> Default for LoadedPrograms<FG> {
fn default() -> Self {
Self {
entries: HashMap::new(),
latest_root_slot: 0,
latest_root_epoch: 0,
environments: ProgramRuntimeEnvironments::default(),
stats: Stats::default(),
fork_graph: None,
}
}
}
#[derive(Clone, Debug, Default)]
@ -531,7 +545,11 @@ pub enum LoadedProgramMatchCriteria {
NoCriteria,
}
impl LoadedPrograms {
impl<FG: ForkGraph> LoadedPrograms<FG> {
pub fn set_fork_graph(&mut self, fork_graph: Arc<RwLock<FG>>) {
self.fork_graph = Some(fork_graph);
}
/// Returns the current environments depending on the given epoch
pub fn get_environments_for_epoch(&self, _epoch: Epoch) -> &ProgramRuntimeEnvironments {
&self.environments
@ -625,12 +643,15 @@ impl LoadedPrograms {
}
/// Before rerooting the blockstore this removes all superfluous entries
pub fn prune<F: ForkGraph>(
&mut self,
fork_graph: &F,
new_root_slot: Slot,
new_root_epoch: Epoch,
) {
pub fn prune(&mut self, new_root_slot: Slot, new_root_epoch: Epoch) {
let Some(fork_graph) = self.fork_graph.clone() else {
error!("Program cache doesn't have fork graph.");
return;
};
let Ok(fork_graph) = fork_graph.read() else {
error!("Failed to lock fork graph for reading.");
return;
};
for second_level in self.entries.values_mut() {
// Remove entries un/re/deployed on orphan forks
let mut first_ancestor_found = false;
@ -911,7 +932,7 @@ impl solana_frozen_abi::abi_example::AbiExample for LoadedProgram {
}
#[cfg(RUSTC_WITH_SPECIALIZATION)]
impl solana_frozen_abi::abi_example::AbiExample for LoadedPrograms {
impl<FG: ForkGraph> solana_frozen_abi::abi_example::AbiExample for LoadedPrograms<FG> {
fn example() -> Self {
// LoadedPrograms isn't serializable by definition.
Self::default()
@ -937,7 +958,7 @@ mod tests {
ops::ControlFlow,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
Arc, RwLock,
},
},
};
@ -945,7 +966,7 @@ mod tests {
static MOCK_ENVIRONMENT: std::sync::OnceLock<ProgramRuntimeEnvironment> =
std::sync::OnceLock::<ProgramRuntimeEnvironment>::new();
fn new_mock_cache() -> LoadedPrograms {
fn new_mock_cache<FG: ForkGraph>() -> LoadedPrograms<FG> {
let mut cache = LoadedPrograms::default();
cache.environments.program_runtime_v1 = MOCK_ENVIRONMENT
.get_or_init(|| Arc::new(BuiltinProgram::new_mock()))
@ -999,8 +1020,8 @@ mod tests {
})
}
fn set_tombstone(
cache: &mut LoadedPrograms,
fn set_tombstone<FG: ForkGraph>(
cache: &mut LoadedPrograms<FG>,
key: Pubkey,
slot: Slot,
reason: LoadedProgramType,
@ -1008,8 +1029,8 @@ mod tests {
cache.assign_program(key, Arc::new(LoadedProgram::new_tombstone(slot, reason)))
}
fn insert_unloaded_program(
cache: &mut LoadedPrograms,
fn insert_unloaded_program<FG: ForkGraph>(
cache: &mut LoadedPrograms<FG>,
key: Pubkey,
slot: Slot,
) -> Arc<LoadedProgram> {
@ -1031,9 +1052,10 @@ mod tests {
cache.replenish(key, unloaded).1
}
fn num_matching_entries<P>(cache: &LoadedPrograms, predicate: P) -> usize
fn num_matching_entries<P, FG>(cache: &LoadedPrograms<FG>, predicate: P) -> usize
where
P: Fn(&LoadedProgramType) -> bool,
FG: ForkGraph,
{
cache
.entries
@ -1052,7 +1074,7 @@ mod tests {
let mut programs = vec![];
let mut num_total_programs: usize = 0;
let mut cache = new_mock_cache();
let mut cache = new_mock_cache::<TestForkGraph>();
let program1 = Pubkey::new_unique();
let program1_deployment_slots = [0, 10, 20];
@ -1218,7 +1240,7 @@ mod tests {
#[test]
fn test_usage_count_of_unloaded_program() {
let mut cache = new_mock_cache();
let mut cache = new_mock_cache::<TestForkGraph>();
let program = Pubkey::new_unique();
let num_total_programs = 6;
@ -1270,7 +1292,7 @@ mod tests {
#[test]
fn test_replace_tombstones() {
let mut cache = new_mock_cache();
let mut cache = new_mock_cache::<TestForkGraph>();
let program1 = Pubkey::new_unique();
let env = Arc::new(BuiltinProgram::new_mock());
set_tombstone(
@ -1302,7 +1324,7 @@ mod tests {
assert_eq!(tombstone.deployment_slot, 100);
assert_eq!(tombstone.effective_slot, 100);
let mut cache = new_mock_cache();
let mut cache = new_mock_cache::<TestForkGraph>();
let program1 = Pubkey::new_unique();
let tombstone = set_tombstone(
&mut cache,
@ -1362,48 +1384,55 @@ mod tests {
#[test]
fn test_prune_empty() {
let mut cache = new_mock_cache();
let fork_graph = TestForkGraph {
let mut cache = new_mock_cache::<TestForkGraph>();
let fork_graph = Arc::new(RwLock::new(TestForkGraph {
relation: BlockRelation::Unrelated,
};
}));
cache.prune(&fork_graph, 0, 0);
cache.set_fork_graph(fork_graph);
cache.prune(0, 0);
assert!(cache.entries.is_empty());
cache.prune(&fork_graph, 10, 0);
cache.prune(10, 0);
assert!(cache.entries.is_empty());
let mut cache = new_mock_cache();
let fork_graph = TestForkGraph {
let mut cache = new_mock_cache::<TestForkGraph>();
let fork_graph = Arc::new(RwLock::new(TestForkGraph {
relation: BlockRelation::Ancestor,
};
}));
cache.prune(&fork_graph, 0, 0);
cache.set_fork_graph(fork_graph);
cache.prune(0, 0);
assert!(cache.entries.is_empty());
cache.prune(&fork_graph, 10, 0);
cache.prune(10, 0);
assert!(cache.entries.is_empty());
let mut cache = new_mock_cache();
let fork_graph = TestForkGraph {
let mut cache = new_mock_cache::<TestForkGraph>();
let fork_graph = Arc::new(RwLock::new(TestForkGraph {
relation: BlockRelation::Descendant,
};
}));
cache.prune(&fork_graph, 0, 0);
cache.set_fork_graph(fork_graph);
cache.prune(0, 0);
assert!(cache.entries.is_empty());
cache.prune(&fork_graph, 10, 0);
cache.prune(10, 0);
assert!(cache.entries.is_empty());
let mut cache = new_mock_cache();
let fork_graph = TestForkGraph {
let mut cache = new_mock_cache::<TestForkGraph>();
let fork_graph = Arc::new(RwLock::new(TestForkGraph {
relation: BlockRelation::Unknown,
};
}));
cache.set_fork_graph(fork_graph);
cache.prune(&fork_graph, 0, 0);
cache.prune(0, 0);
assert!(cache.entries.is_empty());
cache.prune(&fork_graph, 10, 0);
cache.prune(10, 0);
assert!(cache.entries.is_empty());
}
@ -1512,7 +1541,7 @@ mod tests {
#[test]
fn test_fork_extract_and_prune() {
let mut cache = new_mock_cache();
let mut cache = new_mock_cache::<TestForkGraphSpecific>();
// Fork graph created for the test
// 0
@ -1534,6 +1563,9 @@ mod tests {
fork_graph.insert_fork(&[0, 5, 11, 15, 16, 19, 21, 23]);
fork_graph.insert_fork(&[0, 5, 11, 25, 27]);
let fork_graph = Arc::new(RwLock::new(fork_graph));
cache.set_fork_graph(fork_graph);
let program1 = Pubkey::new_unique();
assert!(!cache.replenish(program1, new_test_loaded_program(0, 1)).0);
assert!(!cache.replenish(program1, new_test_loaded_program(10, 11)).0);
@ -1783,7 +1815,7 @@ mod tests {
programs.pop();
}
cache.prune(&fork_graph, 5, 0);
cache.prune(5, 0);
// Fork graph after pruning
// 0
@ -1848,7 +1880,7 @@ mod tests {
assert!(match_slot(&found, &program3, 25, 27));
assert!(match_slot(&found, &program4, 5, 27));
cache.prune(&fork_graph, 15, 0);
cache.prune(15, 0);
// Fork graph after pruning
// 0
@ -1893,7 +1925,7 @@ mod tests {
#[test]
fn test_extract_using_deployment_slot() {
let mut cache = new_mock_cache();
let mut cache = new_mock_cache::<TestForkGraphSpecific>();
// Fork graph created for the test
// 0
@ -1915,6 +1947,9 @@ mod tests {
fork_graph.insert_fork(&[0, 5, 11, 15, 16, 19, 21, 23]);
fork_graph.insert_fork(&[0, 5, 11, 25, 27]);
let fork_graph = Arc::new(RwLock::new(fork_graph));
cache.set_fork_graph(fork_graph);
let program1 = Pubkey::new_unique();
assert!(!cache.replenish(program1, new_test_loaded_program(0, 1)).0);
assert!(!cache.replenish(program1, new_test_loaded_program(20, 21)).0);
@ -1978,7 +2013,7 @@ mod tests {
#[test]
fn test_extract_unloaded() {
let mut cache = new_mock_cache();
let mut cache = new_mock_cache::<TestForkGraphSpecific>();
// Fork graph created for the test
// 0
@ -2000,6 +2035,9 @@ mod tests {
fork_graph.insert_fork(&[0, 5, 11, 15, 16, 19, 21, 23]);
fork_graph.insert_fork(&[0, 5, 11, 25, 27]);
let fork_graph = Arc::new(RwLock::new(fork_graph));
cache.set_fork_graph(fork_graph);
let program1 = Pubkey::new_unique();
assert!(!cache.replenish(program1, new_test_loaded_program(0, 1)).0);
assert!(!cache.replenish(program1, new_test_loaded_program(20, 21)).0);
@ -2096,7 +2134,7 @@ mod tests {
#[test]
fn test_prune_expired() {
let mut cache = new_mock_cache();
let mut cache = new_mock_cache::<TestForkGraphSpecific>();
// Fork graph created for the test
// 0
@ -2117,6 +2155,8 @@ mod tests {
fork_graph.insert_fork(&[0, 10, 20, 22]);
fork_graph.insert_fork(&[0, 5, 11, 15, 16, 19, 21, 23]);
fork_graph.insert_fork(&[0, 5, 11, 25, 27]);
let fork_graph = Arc::new(RwLock::new(fork_graph));
cache.set_fork_graph(fork_graph);
let program1 = Pubkey::new_unique();
assert!(!cache.replenish(program1, new_test_loaded_program(10, 11)).0);
@ -2198,7 +2238,7 @@ mod tests {
);
// New root 5 should not evict the expired entry for program1
cache.prune(&fork_graph, 5, 0);
cache.prune(5, 0);
assert_eq!(
cache
.entries
@ -2209,13 +2249,13 @@ mod tests {
);
// New root 15 should evict the expired entry for program1
cache.prune(&fork_graph, 15, 0);
cache.prune(15, 0);
assert!(cache.entries.get(&program1).is_none());
}
#[test]
fn test_fork_prune_find_first_ancestor() {
let mut cache = new_mock_cache();
let mut cache = new_mock_cache::<TestForkGraphSpecific>();
// Fork graph created for the test
// 0
@ -2230,12 +2270,14 @@ mod tests {
let mut fork_graph = TestForkGraphSpecific::default();
fork_graph.insert_fork(&[0, 10, 20]);
fork_graph.insert_fork(&[0, 5]);
let fork_graph = Arc::new(RwLock::new(fork_graph));
cache.set_fork_graph(fork_graph);
let program1 = Pubkey::new_unique();
assert!(!cache.replenish(program1, new_test_loaded_program(0, 1)).0);
assert!(!cache.replenish(program1, new_test_loaded_program(5, 6)).0);
cache.prune(&fork_graph, 10, 0);
cache.prune(10, 0);
let working_slot = TestWorkingSlot::new(20, &[0, 10, 20]);
let ExtractedPrograms {
@ -2261,7 +2303,7 @@ mod tests {
#[test]
fn test_prune_by_deployment_slot() {
let mut cache = new_mock_cache();
let mut cache = new_mock_cache::<TestForkGraphSpecific>();
// Fork graph created for the test
// 0
@ -2276,6 +2318,8 @@ mod tests {
let mut fork_graph = TestForkGraphSpecific::default();
fork_graph.insert_fork(&[0, 10, 20]);
fork_graph.insert_fork(&[0, 5]);
let fork_graph = Arc::new(RwLock::new(fork_graph));
cache.set_fork_graph(fork_graph);
let program1 = Pubkey::new_unique();
assert!(!cache.replenish(program1, new_test_loaded_program(0, 1)).0);
@ -2380,34 +2424,34 @@ mod tests {
#[test]
fn test_usable_entries_for_slot() {
new_mock_cache();
new_mock_cache::<TestForkGraph>();
let tombstone = Arc::new(LoadedProgram::new_tombstone(0, LoadedProgramType::Closed));
assert!(LoadedPrograms::is_entry_usable(
assert!(LoadedPrograms::<TestForkGraph>::is_entry_usable(
&tombstone,
0,
&LoadedProgramMatchCriteria::NoCriteria
));
assert!(LoadedPrograms::is_entry_usable(
assert!(LoadedPrograms::<TestForkGraph>::is_entry_usable(
&tombstone,
1,
&LoadedProgramMatchCriteria::Tombstone
));
assert!(LoadedPrograms::is_entry_usable(
assert!(LoadedPrograms::<TestForkGraph>::is_entry_usable(
&tombstone,
1,
&LoadedProgramMatchCriteria::NoCriteria
));
assert!(LoadedPrograms::is_entry_usable(
assert!(LoadedPrograms::<TestForkGraph>::is_entry_usable(
&tombstone,
1,
&LoadedProgramMatchCriteria::DeployedOnOrAfterSlot(0)
));
assert!(!LoadedPrograms::is_entry_usable(
assert!(!LoadedPrograms::<TestForkGraph>::is_entry_usable(
&tombstone,
1,
&LoadedProgramMatchCriteria::DeployedOnOrAfterSlot(1)
@ -2415,31 +2459,31 @@ mod tests {
let program = new_test_loaded_program(0, 1);
assert!(LoadedPrograms::is_entry_usable(
assert!(LoadedPrograms::<TestForkGraph>::is_entry_usable(
&program,
0,
&LoadedProgramMatchCriteria::NoCriteria
));
assert!(!LoadedPrograms::is_entry_usable(
assert!(!LoadedPrograms::<TestForkGraph>::is_entry_usable(
&program,
1,
&LoadedProgramMatchCriteria::Tombstone
));
assert!(LoadedPrograms::is_entry_usable(
assert!(LoadedPrograms::<TestForkGraph>::is_entry_usable(
&program,
1,
&LoadedProgramMatchCriteria::NoCriteria
));
assert!(LoadedPrograms::is_entry_usable(
assert!(LoadedPrograms::<TestForkGraph>::is_entry_usable(
&program,
1,
&LoadedProgramMatchCriteria::DeployedOnOrAfterSlot(0)
));
assert!(!LoadedPrograms::is_entry_usable(
assert!(!LoadedPrograms::<TestForkGraph>::is_entry_usable(
&program,
1,
&LoadedProgramMatchCriteria::DeployedOnOrAfterSlot(1)
@ -2452,37 +2496,37 @@ mod tests {
Some(2),
));
assert!(LoadedPrograms::is_entry_usable(
assert!(LoadedPrograms::<TestForkGraph>::is_entry_usable(
&program,
0,
&LoadedProgramMatchCriteria::NoCriteria
));
assert!(LoadedPrograms::is_entry_usable(
assert!(LoadedPrograms::<TestForkGraph>::is_entry_usable(
&program,
1,
&LoadedProgramMatchCriteria::NoCriteria
));
assert!(!LoadedPrograms::is_entry_usable(
assert!(!LoadedPrograms::<TestForkGraph>::is_entry_usable(
&program,
1,
&LoadedProgramMatchCriteria::Tombstone
));
assert!(!LoadedPrograms::is_entry_usable(
assert!(!LoadedPrograms::<TestForkGraph>::is_entry_usable(
&program,
2,
&LoadedProgramMatchCriteria::NoCriteria
));
assert!(LoadedPrograms::is_entry_usable(
assert!(LoadedPrograms::<TestForkGraph>::is_entry_usable(
&program,
1,
&LoadedProgramMatchCriteria::DeployedOnOrAfterSlot(0)
));
assert!(!LoadedPrograms::is_entry_usable(
assert!(!LoadedPrograms::<TestForkGraph>::is_entry_usable(
&program,
1,
&LoadedProgramMatchCriteria::DeployedOnOrAfterSlot(1)

View File

@ -39,6 +39,7 @@ pub use solana_sdk::reward_type::RewardType;
use {
crate::{
bank::metrics::*,
bank_forks::BankForks,
builtins::{BuiltinPrototype, BUILTINS},
epoch_rewards_hasher::hash_rewards_into_partitions,
epoch_stakes::{EpochStakes, NodeVoteAccounts},
@ -820,7 +821,7 @@ pub struct Bank {
pub incremental_snapshot_persistence: Option<BankIncrementalSnapshotPersistence>,
pub loaded_programs_cache: Arc<RwLock<LoadedPrograms>>,
pub loaded_programs_cache: Arc<RwLock<LoadedPrograms<BankForks>>>,
pub check_program_modification_slot: bool,
@ -1070,7 +1071,7 @@ impl Bank {
accounts_data_size_delta_on_chain: AtomicI64::new(0),
accounts_data_size_delta_off_chain: AtomicI64::new(0),
fee_structure: FeeStructure::default(),
loaded_programs_cache: Arc::<RwLock<LoadedPrograms>>::default(),
loaded_programs_cache: Arc::<RwLock<LoadedPrograms<BankForks>>>::default(),
check_program_modification_slot: false,
epoch_reward_status: EpochRewardStatus::default(),
};
@ -1856,7 +1857,7 @@ impl Bank {
accounts_data_size_delta_on_chain: AtomicI64::new(0),
accounts_data_size_delta_off_chain: AtomicI64::new(0),
fee_structure: FeeStructure::default(),
loaded_programs_cache: Arc::<RwLock<LoadedPrograms>>::default(),
loaded_programs_cache: Arc::<RwLock<LoadedPrograms<BankForks>>>::default(),
check_program_modification_slot: false,
epoch_reward_status: EpochRewardStatus::default(),
};

View File

@ -55,6 +55,7 @@ struct SetRootTimings {
prune_remove_ms: i64,
}
#[derive(Debug)]
pub struct BankForks {
banks: HashMap<Slot, Arc<Bank>>,
descendants: HashMap<Slot, HashSet<Slot>>,
@ -404,6 +405,16 @@ impl BankForks {
)
}
pub fn prune_program_cache(&self, root: Slot) {
if let Some(root_bank) = self.banks.get(&root) {
root_bank
.loaded_programs_cache
.write()
.unwrap()
.prune(root, root_bank.epoch());
}
}
pub fn set_root(
&mut self,
root: Slot,
@ -411,15 +422,6 @@ impl BankForks {
highest_super_majority_root: Option<Slot>,
) -> Vec<Arc<Bank>> {
let program_cache_prune_start = Instant::now();
let root_bank = self
.banks
.get(&root)
.expect("root bank didn't exist in bank_forks");
root_bank
.loaded_programs_cache
.write()
.unwrap()
.prune(self, root, root_bank.epoch());
let set_root_start = Instant::now();
let (removed_banks, set_root_metrics) = self.do_set_root_return_metrics(
root,