From 68e5a2ef5624a3f58f8bf99423b1ab322ea6702c Mon Sep 17 00:00:00 2001 From: sakridge Date: Wed, 23 Sep 2020 18:46:42 -0700 Subject: [PATCH] Add RPC notify and banking keys debug (#12396) --- core/src/rpc_subscriptions.rs | 65 +++++++++++++++++++++++------ core/src/validator.rs | 3 ++ core/tests/bank_forks.rs | 2 + ledger/src/bank_forks_utils.rs | 1 + ledger/src/blockstore_processor.rs | 6 ++- runtime/benches/accounts.rs | 3 +- runtime/src/bank.rs | 17 +++++++- runtime/src/serde_snapshot.rs | 7 +++- runtime/src/serde_snapshot/tests.rs | 1 + runtime/src/snapshot_utils.rs | 6 +++ validator/src/main.rs | 20 +++++++++ 11 files changed, 112 insertions(+), 19 deletions(-) diff --git a/core/src/rpc_subscriptions.rs b/core/src/rpc_subscriptions.rs index 07c04c966..d2785bbcf 100644 --- a/core/src/rpc_subscriptions.rs +++ b/core/src/rpc_subscriptions.rs @@ -17,6 +17,7 @@ use solana_client::{ RpcResponseContext, RpcSignatureResult, SlotInfo, }, }; +use solana_measure::measure::Measure; use solana_runtime::{ bank::Bank, bank_forks::BankForks, @@ -445,7 +446,7 @@ impl RpcSubscriptions { account_subscriptions: Arc, notifier: &RpcNotifier, commitment_slots: &CommitmentSlots, - ) { + ) -> HashSet { let subscriptions = account_subscriptions.read().unwrap(); check_commitment_and_notify( &subscriptions, @@ -455,7 +456,7 @@ impl RpcSubscriptions { Bank::get_account_modified_slot, filter_account_result, notifier, - ); + ) } fn check_program( @@ -464,7 +465,7 @@ impl RpcSubscriptions { program_subscriptions: Arc, notifier: &RpcNotifier, commitment_slots: &CommitmentSlots, - ) { + ) -> HashSet { let subscriptions = program_subscriptions.read().unwrap(); check_commitment_and_notify( &subscriptions, @@ -474,7 +475,7 @@ impl RpcSubscriptions { Bank::get_program_accounts_modified_since_parent, filter_program_results, notifier, - ); + ) } fn check_signature( @@ -483,7 +484,7 @@ impl RpcSubscriptions { signature_subscriptions: Arc, notifier: &RpcNotifier, commitment_slots: &CommitmentSlots, - ) { + ) -> HashSet { let mut subscriptions = signature_subscriptions.write().unwrap(); let notified_ids = check_commitment_and_notify( &subscriptions, @@ -500,6 +501,7 @@ impl RpcSubscriptions { subscriptions.remove(&signature); } } + notified_ids } pub fn add_account_subscription( @@ -777,6 +779,7 @@ impl RpcSubscriptions { match notification_receiver.recv_timeout(Duration::from_millis(RECEIVE_DELAY_MILLIS)) { Ok(notification_entry) => match notification_entry { NotificationEntry::Slot(slot_info) => { + debug!("slot notify: {:?}", slot_info); let subscriptions = subscriptions.slot_subscriptions.read().unwrap(); for (_, sink) in subscriptions.iter() { notifier.notify(slot_info, sink); @@ -786,6 +789,7 @@ impl RpcSubscriptions { // unlike `NotificationEntry::Gossip`, which also accounts for slots seen // in VoteState's from bank states built in ReplayStage. NotificationEntry::Vote(ref vote_info) => { + debug!("vote notify: {:?}", vote_info); let subscriptions = subscriptions.vote_subscriptions.read().unwrap(); for (_, sink) in subscriptions.iter() { notifier.notify( @@ -799,9 +803,12 @@ impl RpcSubscriptions { } } NotificationEntry::Root(root) => { - let subscriptions = subscriptions.root_subscriptions.read().unwrap(); - for (_, sink) in subscriptions.iter() { - notifier.notify(root, sink); + debug!("root notify: {:?}", root); + { + let subscriptions = subscriptions.root_subscriptions.read().unwrap(); + for (_, sink) in subscriptions.iter() { + notifier.notify(root, sink); + } } // Prune old pending notifications @@ -818,6 +825,7 @@ impl RpcSubscriptions { &bank_forks, &commitment_slots, ¬ifier, + "bank", ) } NotificationEntry::Frozen(slot) => { @@ -903,6 +911,7 @@ impl RpcSubscriptions { bank_forks, &commitment_slots, ¬ifier, + "gossip", ); } @@ -913,46 +922,76 @@ impl RpcSubscriptions { bank_forks: &Arc>, commitment_slots: &CommitmentSlots, notifier: &RpcNotifier, + source: &'static str, ) { + let mut accounts_time = Measure::start("accounts"); let pubkeys: Vec<_> = { let subs = account_subscriptions.read().unwrap(); subs.keys().cloned().collect() }; + let mut num_pubkeys_notified = 0; for pubkey in &pubkeys { - Self::check_account( + num_pubkeys_notified += Self::check_account( pubkey, bank_forks, account_subscriptions.clone(), ¬ifier, &commitment_slots, - ); + ) + .len(); } + accounts_time.stop(); + let mut programs_time = Measure::start("programs"); let programs: Vec<_> = { let subs = program_subscriptions.read().unwrap(); subs.keys().cloned().collect() }; + let mut num_programs_notified = 0; for program_id in &programs { - Self::check_program( + num_programs_notified += Self::check_program( program_id, bank_forks, program_subscriptions.clone(), ¬ifier, &commitment_slots, - ); + ) + .len(); } + programs_time.stop(); + let mut signatures_time = Measure::start("signatures"); let signatures: Vec<_> = { let subs = signature_subscriptions.read().unwrap(); subs.keys().cloned().collect() }; + let mut num_signatures_notified = 0; for signature in &signatures { - Self::check_signature( + num_signatures_notified += Self::check_signature( signature, bank_forks, signature_subscriptions.clone(), ¬ifier, &commitment_slots, + ) + .len(); + } + signatures_time.stop(); + let total_notified = num_pubkeys_notified + num_programs_notified + num_signatures_notified; + let total_ms = accounts_time.as_ms() + programs_time.as_ms() + signatures_time.as_ms(); + if total_notified > 0 || total_ms > 10 { + debug!( + "notified({}): accounts: {} / {} ({}) programs: {} / {} ({}) signatures: {} / {} ({})", + source, + pubkeys.len(), + num_pubkeys_notified, + accounts_time, + programs.len(), + num_programs_notified, + programs_time, + signatures.len(), + num_signatures_notified, + signatures_time, ); } } diff --git a/core/src/validator.rs b/core/src/validator.rs index e1adc6e08..26d0984c1 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -99,6 +99,7 @@ pub struct ValidatorConfig { pub poh_verify: bool, // Perform PoH verification during blockstore processing at boo pub cuda: bool, pub require_tower: bool, + pub debug_keys: Option>>, } impl Default for ValidatorConfig { @@ -132,6 +133,7 @@ impl Default for ValidatorConfig { poh_verify: true, cuda: false, require_tower: false, + debug_keys: None, } } } @@ -788,6 +790,7 @@ fn new_banks_from_ledger( dev_halt_at_slot: config.dev_halt_at_slot, new_hard_forks: config.new_hard_forks.clone(), frozen_accounts: config.frozen_accounts.clone(), + debug_keys: config.debug_keys.clone(), ..blockstore_processor::ProcessOptions::default() }; diff --git a/core/tests/bank_forks.rs b/core/tests/bank_forks.rs index a52837666..ec386c84e 100644 --- a/core/tests/bank_forks.rs +++ b/core/tests/bank_forks.rs @@ -88,6 +88,7 @@ mod tests { &genesis_config_info.genesis_config, vec![accounts_dir.path().to_path_buf()], &[], + None, ); bank0.freeze(); let mut bank_forks = BankForks::new(bank0); @@ -141,6 +142,7 @@ mod tests { ), CompressionType::Bzip2, old_genesis_config, + None, ) .unwrap(); diff --git a/ledger/src/bank_forks_utils.rs b/ledger/src/bank_forks_utils.rs index 4002b7898..3447475e3 100644 --- a/ledger/src/bank_forks_utils.rs +++ b/ledger/src/bank_forks_utils.rs @@ -65,6 +65,7 @@ pub fn load( &archive_filename, compression, genesis_config, + process_options.debug_keys.clone(), ) .expect("Load from snapshot failed"); diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 793fce24a..115e82464 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -36,7 +36,7 @@ use solana_sdk::{ use solana_vote_program::vote_state::VoteState; use std::{ cell::RefCell, - collections::HashMap, + collections::{HashMap, HashSet}, path::PathBuf, result, sync::Arc, @@ -311,6 +311,7 @@ pub struct ProcessOptions { pub override_num_threads: Option, pub new_hard_forks: Option>, pub frozen_accounts: Vec, + pub debug_keys: Option>>, } fn initiate_callback(mut bank: &mut Arc, genesis_config: &GenesisConfig) { @@ -341,6 +342,7 @@ pub fn process_blockstore( &genesis_config, account_paths, &opts.frozen_accounts, + opts.debug_keys.clone(), )); initiate_callback(&mut bank0, genesis_config); info!("processing ledger for slot 0..."); @@ -2846,7 +2848,7 @@ pub mod tests { genesis_config: &GenesisConfig, account_paths: Vec, ) -> EpochSchedule { - let bank = Bank::new_with_paths(&genesis_config, account_paths, &[]); + let bank = Bank::new_with_paths(&genesis_config, account_paths, &[], None); *bank.epoch_schedule() } diff --git a/runtime/benches/accounts.rs b/runtime/benches/accounts.rs index 5ba533a5b..766467a80 100644 --- a/runtime/benches/accounts.rs +++ b/runtime/benches/accounts.rs @@ -36,7 +36,7 @@ fn bench_has_duplicates(bencher: &mut Bencher) { #[bench] fn test_accounts_create(bencher: &mut Bencher) { let (genesis_config, _) = create_genesis_config(10_000); - let bank0 = Bank::new_with_paths(&genesis_config, vec![PathBuf::from("bench_a0")], &[]); + let bank0 = Bank::new_with_paths(&genesis_config, vec![PathBuf::from("bench_a0")], &[], None); bencher.iter(|| { let mut pubkeys: Vec = vec![]; deposit_many(&bank0, &mut pubkeys, 1000); @@ -50,6 +50,7 @@ fn test_accounts_squash(bencher: &mut Bencher) { &genesis_config, vec![PathBuf::from("bench_a1")], &[], + None, )); let mut pubkeys: Vec = vec![]; deposit_many(&bank1, &mut pubkeys, 250_000); diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 893973183..4da18f215 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -516,6 +516,8 @@ pub struct Bank { /// Cached executors cached_executors: Arc>, + + transaction_debug_keys: Option>>, } impl Default for BlockhashQueue { @@ -526,15 +528,17 @@ impl Default for BlockhashQueue { impl Bank { pub fn new(genesis_config: &GenesisConfig) -> Self { - Self::new_with_paths(&genesis_config, Vec::new(), &[]) + Self::new_with_paths(&genesis_config, Vec::new(), &[], None) } pub fn new_with_paths( genesis_config: &GenesisConfig, paths: Vec, frozen_account_pubkeys: &[Pubkey], + debug_keys: Option>>, ) -> Self { let mut bank = Self::default(); + bank.transaction_debug_keys = debug_keys; bank.cluster_type = Some(genesis_config.cluster_type); bank.ancestors.insert(bank.slot(), 0); @@ -636,6 +640,7 @@ impl Bank { lazy_rent_collection: AtomicBool::new(parent.lazy_rent_collection.load(Relaxed)), rewards_pool_pubkeys: parent.rewards_pool_pubkeys.clone(), cached_executors: parent.cached_executors.clone(), + transaction_debug_keys: parent.transaction_debug_keys.clone(), }; datapoint_info!( @@ -688,6 +693,7 @@ impl Bank { bank_rc: BankRc, genesis_config: &GenesisConfig, fields: BankFieldsToDeserialize, + debug_keys: Option>>, ) -> Self { fn new() -> T { T::default() @@ -738,6 +744,7 @@ impl Bank { lazy_rent_collection: new(), rewards_pool_pubkeys: new(), cached_executors: Arc::new(RwLock::new(CachedExecutors::new(MAX_CACHED_EXECUTORS))), + transaction_debug_keys: debug_keys, }; bank.finish_init(genesis_config); @@ -2015,6 +2022,14 @@ impl Bank { let mut tx_count: u64 = 0; let err_count = &mut error_counters.total; for ((r, _hash_age_kind), tx) in executed.iter().zip(txs.iter()) { + if let Some(debug_keys) = &self.transaction_debug_keys { + for key in &tx.message.account_keys { + if debug_keys.contains(key) { + info!("slot: {} result: {:?} tx: {:?}", self.slot, r, tx); + break; + } + } + } if r.is_ok() { tx_count += 1; } else { diff --git a/runtime/src/serde_snapshot.rs b/runtime/src/serde_snapshot.rs index 39054414e..a3c385766 100644 --- a/runtime/src/serde_snapshot.rs +++ b/runtime/src/serde_snapshot.rs @@ -29,7 +29,7 @@ use { pubkey::Pubkey, }, std::{ - collections::HashMap, + collections::{HashMap, HashSet}, io::{BufReader, BufWriter, Read, Write}, path::{Path, PathBuf}, result::Result, @@ -124,6 +124,7 @@ pub(crate) fn bank_from_stream( account_paths: &[PathBuf], genesis_config: &GenesisConfig, frozen_account_pubkeys: &[Pubkey], + debug_keys: Option>>, ) -> std::result::Result where R: Read, @@ -140,6 +141,7 @@ where frozen_account_pubkeys, account_paths, append_vecs_path, + debug_keys, )?; Ok(bank) }}; @@ -224,6 +226,7 @@ fn reconstruct_bank_from_fields( frozen_account_pubkeys: &[Pubkey], account_paths: &[PathBuf], append_vecs_path: P, + debug_keys: Option>>, ) -> Result where E: Into, @@ -238,7 +241,7 @@ where accounts_db.freeze_accounts(&bank_fields.ancestors, frozen_account_pubkeys); let bank_rc = BankRc::new(Accounts::new_empty(accounts_db), bank_fields.slot); - let bank = Bank::new_from_fields(bank_rc, genesis_config, bank_fields); + let bank = Bank::new_from_fields(bank_rc, genesis_config, bank_fields, debug_keys); Ok(bank) } diff --git a/runtime/src/serde_snapshot/tests.rs b/runtime/src/serde_snapshot/tests.rs index ee21da53a..847eb303e 100644 --- a/runtime/src/serde_snapshot/tests.rs +++ b/runtime/src/serde_snapshot/tests.rs @@ -211,6 +211,7 @@ fn test_bank_serialize_style(serde_style: SerdeStyle) { &dbank_paths, &genesis_config, &[], + None, ) .unwrap(); dbank.src = ref_sc; diff --git a/runtime/src/snapshot_utils.rs b/runtime/src/snapshot_utils.rs index 6b609dae7..f6e5f54d7 100644 --- a/runtime/src/snapshot_utils.rs +++ b/runtime/src/snapshot_utils.rs @@ -20,6 +20,8 @@ use solana_sdk::{ hash::Hash, pubkey::Pubkey, }; +use std::collections::HashSet; +use std::sync::Arc; use std::{ cmp::Ordering, fmt, @@ -571,6 +573,7 @@ pub fn bank_from_archive>( snapshot_tar: P, compression: CompressionType, genesis_config: &GenesisConfig, + debug_keys: Option>>, ) -> Result { // Untar the snapshot into a temp directory under `snapshot_config.snapshot_path()` let unpack_dir = tempfile::tempdir_in(snapshot_path)?; @@ -591,6 +594,7 @@ pub fn bank_from_archive>( &unpacked_snapshots_dir, unpacked_accounts_dir, genesis_config, + debug_keys, )?; if !bank.verify_snapshot_bank() { @@ -748,6 +752,7 @@ fn rebuild_bank_from_snapshots

( unpacked_snapshots_dir: &PathBuf, append_vecs_path: P, genesis_config: &GenesisConfig, + debug_keys: Option>>, ) -> Result where P: AsRef, @@ -779,6 +784,7 @@ where account_paths, genesis_config, frozen_account_pubkeys, + debug_keys, ), }?) })?; diff --git a/validator/src/main.rs b/validator/src/main.rs index 2957f0481..88c1917b8 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1127,6 +1127,15 @@ pub fn main() { .help("A snapshot hash must be published in gossip by this validator to be accepted. \ May be specified multiple times. If unspecified any snapshot hash will be accepted"), ) + .arg( + Arg::with_name("debug_key") + .long("debug-key") + .validator(is_pubkey) + .value_name("PUBKEY") + .multiple(true) + .takes_value(true) + .help("Log when transactions are processed which reference a given key."), + ) .arg( Arg::with_name("no_untrusted_rpc") .long("no-untrusted-rpc") @@ -1264,6 +1273,16 @@ pub fn main() { exit(1); }); + let debug_keys: Option>> = if matches.is_present("debug_key") { + Some(Arc::new( + values_t_or_exit!(matches, "debug_key", Pubkey) + .into_iter() + .collect(), + )) + } else { + None + }; + let trusted_validators = validators_set( &identity_keypair.pubkey(), &matches, @@ -1339,6 +1358,7 @@ pub fn main() { no_rocksdb_compaction, wal_recovery_mode, poh_verify: !matches.is_present("skip_poh_verify"), + debug_keys, ..ValidatorConfig::default() };