From f13b5c832dd7878439da7862ce5dcb3a51bc5dca Mon Sep 17 00:00:00 2001 From: Tao Zhu <82401714+taozhu-chicago@users.noreply.github.com> Date: Thu, 14 Jul 2022 23:02:49 -0500 Subject: [PATCH] Remove obsoleted metrics reporting to reduce lock contention on cost_model (#26608) remove obsoleted metrics reporting to reduce lock contention on cost_model --- core/src/replay_stage.rs | 8 -- core/src/tvu.rs | 21 +--- ledger/src/blockstore_processor.rs | 16 --- runtime/src/lib.rs | 1 - .../src/transaction_cost_metrics_sender.rs | 119 ------------------ 5 files changed, 2 insertions(+), 163 deletions(-) delete mode 100644 runtime/src/transaction_cost_metrics_sender.rs diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index a38e0e2cc..2202675e5 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -52,7 +52,6 @@ use { bank::{Bank, NewBankOptions}, bank_forks::{BankForks, MAX_ROOT_DISTANCE_FOR_VOTE_ONLY}, commitment::BlockCommitmentCache, - transaction_cost_metrics_sender::TransactionCostMetricsSender, vote_sender_types::ReplayVoteSender, }, solana_sdk::{ @@ -374,7 +373,6 @@ impl ReplayStage { voting_sender: Sender, drop_bank_sender: Sender>>, block_metadata_notifier: Option, - transaction_cost_metrics_sender: Option, log_messages_bytes_limit: Option, ) -> Self { let mut tower = if let Some(process_blockstore) = maybe_process_blockstore { @@ -506,7 +504,6 @@ impl ReplayStage { &mut duplicate_slots_to_repair, &ancestor_hashes_replay_update_sender, block_metadata_notifier.clone(), - transaction_cost_metrics_sender.as_ref(), &mut replay_timing, log_messages_bytes_limit ); @@ -1690,7 +1687,6 @@ impl ReplayStage { bank_progress: &mut ForkProgress, transaction_status_sender: Option<&TransactionStatusSender>, replay_vote_sender: &ReplayVoteSender, - transaction_cost_metrics_sender: Option<&TransactionCostMetricsSender>, verify_recyclers: &VerifyRecyclers, log_messages_bytes_limit: Option, ) -> result::Result { @@ -1706,7 +1702,6 @@ impl ReplayStage { false, transaction_status_sender, Some(replay_vote_sender), - transaction_cost_metrics_sender, None, verify_recyclers, false, @@ -2209,7 +2204,6 @@ impl ReplayStage { duplicate_slots_to_repair: &mut DuplicateSlotsToRepair, ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender, block_metadata_notifier: Option, - transaction_cost_metrics_sender: Option<&TransactionCostMetricsSender>, replay_timing: &mut ReplayTiming, log_messages_bytes_limit: Option, ) -> bool { @@ -2262,7 +2256,6 @@ impl ReplayStage { bank_progress, transaction_status_sender, replay_vote_sender, - transaction_cost_metrics_sender, verify_recyclers, log_messages_bytes_limit, ); @@ -3883,7 +3876,6 @@ pub(crate) mod tests { bank1_progress, None, &replay_vote_sender, - None, &VerifyRecyclers::default(), None, ); diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 7fa62f6f7..9004131c5 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -41,13 +41,8 @@ use { rpc_subscriptions::RpcSubscriptions, }, solana_runtime::{ - accounts_background_service::AbsRequestSender, - bank_forks::BankForks, - commitment::BlockCommitmentCache, - cost_model::CostModel, - transaction_cost_metrics_sender::{ - TransactionCostMetricsSender, TransactionCostMetricsService, - }, + accounts_background_service::AbsRequestSender, bank_forks::BankForks, + commitment::BlockCommitmentCache, cost_model::CostModel, vote_sender_types::ReplayVoteSender, }, solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Keypair}, @@ -71,7 +66,6 @@ pub struct Tvu { voting_service: VotingService, warm_quic_cache_service: Option, drop_bank_service: DropBankService, - transaction_cost_metrics_service: TransactionCostMetricsService, } pub struct TvuSockets { @@ -269,14 +263,6 @@ impl Tvu { let (drop_bank_sender, drop_bank_receiver) = unbounded(); - let (tx_cost_metrics_sender, tx_cost_metrics_receiver) = unbounded(); - let transaction_cost_metrics_sender = Some(TransactionCostMetricsSender::new( - cost_model.clone(), - tx_cost_metrics_sender, - )); - let transaction_cost_metrics_service = - TransactionCostMetricsService::new(tx_cost_metrics_receiver); - let drop_bank_service = DropBankService::new(drop_bank_receiver); let replay_stage = ReplayStage::new( @@ -300,7 +286,6 @@ impl Tvu { voting_sender, drop_bank_sender, block_metadata_notifier, - transaction_cost_metrics_sender, log_messages_bytes_limit, ); @@ -327,7 +312,6 @@ impl Tvu { voting_service, warm_quic_cache_service, drop_bank_service, - transaction_cost_metrics_service, } } @@ -347,7 +331,6 @@ impl Tvu { warmup_service.join()?; } self.drop_bank_service.join()?; - self.transaction_cost_metrics_service.join()?; Ok(()) } } diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 75ab24dc8..49ee4b64c 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -32,7 +32,6 @@ use { cost_model::CostModel, runtime_config::RuntimeConfig, transaction_batch::TransactionBatch, - transaction_cost_metrics_sender::TransactionCostMetricsSender, vote_account::VoteAccountsHashMap, vote_sender_types::ReplayVoteSender, }, @@ -524,7 +523,6 @@ pub fn process_entries_for_tests( None, transaction_status_sender, replay_vote_sender, - None, &mut confirmation_timing, Arc::new(RwLock::new(BlockCostCapacityMeter::default())), None, @@ -543,7 +541,6 @@ fn process_entries_with_callback( entry_callback: Option<&ProcessCallback>, transaction_status_sender: Option<&TransactionStatusSender>, replay_vote_sender: Option<&ReplayVoteSender>, - transaction_cost_metrics_sender: Option<&TransactionCostMetricsSender>, confirmation_timing: &mut ConfirmationTiming, cost_capacity_meter: Arc>, log_messages_bytes_limit: Option, @@ -585,11 +582,6 @@ fn process_entries_with_callback( } } EntryType::Transactions(transactions) => { - if let Some(transaction_cost_metrics_sender) = transaction_cost_metrics_sender { - transaction_cost_metrics_sender - .send_cost_details(bank.clone(), transactions.iter()); - } - let starting_index = *starting_index; let transaction_indexes = if randomize { let mut transactions_and_indexes: Vec<(SanitizedTransaction, usize)> = @@ -973,7 +965,6 @@ fn confirm_full_slot( skip_verification, transaction_status_sender, replay_vote_sender, - None, opts.entry_callback.as_ref(), recyclers, opts.allow_dead_slots, @@ -1100,7 +1091,6 @@ pub fn confirm_slot( skip_verification: bool, transaction_status_sender: Option<&TransactionStatusSender>, replay_vote_sender: Option<&ReplayVoteSender>, - transaction_cost_metrics_sender: Option<&TransactionCostMetricsSender>, entry_callback: Option<&ProcessCallback>, recyclers: &VerifyRecyclers, allow_dead_slots: bool, @@ -1130,7 +1120,6 @@ pub fn confirm_slot( skip_verification, transaction_status_sender, replay_vote_sender, - transaction_cost_metrics_sender, entry_callback, recyclers, log_messages_bytes_limit, @@ -1146,7 +1135,6 @@ fn confirm_slot_entries( skip_verification: bool, transaction_status_sender: Option<&TransactionStatusSender>, replay_vote_sender: Option<&ReplayVoteSender>, - transaction_cost_metrics_sender: Option<&TransactionCostMetricsSender>, entry_callback: Option<&ProcessCallback>, recyclers: &VerifyRecyclers, log_messages_bytes_limit: Option, @@ -1248,7 +1236,6 @@ fn confirm_slot_entries( entry_callback, transaction_status_sender, replay_vote_sender, - transaction_cost_metrics_sender, timing, cost_capacity_meter, log_messages_bytes_limit, @@ -4128,7 +4115,6 @@ pub mod tests { None, None, None, - None, &VerifyRecyclers::default(), None, ) @@ -4272,7 +4258,6 @@ pub mod tests { Some(&transaction_status_sender), None, None, - None, &VerifyRecyclers::default(), None, ) @@ -4318,7 +4303,6 @@ pub mod tests { Some(&transaction_status_sender), None, None, - None, &VerifyRecyclers::default(), None, ) diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index a52a6ed07..172f2a80e 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -72,7 +72,6 @@ pub mod status_cache; mod storable_accounts; mod system_instruction_processor; pub mod transaction_batch; -pub mod transaction_cost_metrics_sender; pub mod transaction_error_metrics; pub mod vote_account; pub mod vote_parser; diff --git a/runtime/src/transaction_cost_metrics_sender.rs b/runtime/src/transaction_cost_metrics_sender.rs deleted file mode 100644 index eab9a2b9a..000000000 --- a/runtime/src/transaction_cost_metrics_sender.rs +++ /dev/null @@ -1,119 +0,0 @@ -use { - crate::{bank::Bank, cost_model::CostModel}, - crossbeam_channel::{Receiver, Sender}, - log::*, - solana_sdk::{clock::Slot, signature::Signature, transaction::SanitizedTransaction}, - std::{ - sync::{Arc, RwLock}, - thread::{self, Builder, JoinHandle}, - }, -}; - -pub enum TransactionCostMetrics { - TransactionCostDetail { - slot: Slot, - tx_signature: Signature, - signature_cost: u64, - write_lock_cost: u64, - data_bytes_cost: u64, - builtins_execution_cost: u64, - bpf_execution_cost: u64, - }, -} - -pub struct TransactionCostMetricsSender { - cost_model: Arc>, - metrics_sender: Sender, -} - -impl TransactionCostMetricsSender { - pub fn new( - cost_model: Arc>, - metrics_sender: Sender, - ) -> Self { - Self { - cost_model, - metrics_sender, - } - } - - pub fn send_cost_details<'a>( - &self, - bank: Arc, - txs: impl Iterator, - ) { - let cost_model = self.cost_model.read().unwrap(); - txs.for_each(|tx| { - let cost = cost_model.calculate_cost(tx); - self.metrics_sender - .send(TransactionCostMetrics::TransactionCostDetail { - slot: bank.slot(), - tx_signature: *tx.signature(), - signature_cost: cost.signature_cost, - write_lock_cost: cost.write_lock_cost, - data_bytes_cost: cost.data_bytes_cost, - builtins_execution_cost: cost.builtins_execution_cost, - bpf_execution_cost: cost.bpf_execution_cost, - }) - .unwrap_or_else(|err| { - warn!( - "transaction cost metrics service report cost detail failed: {:?}", - err - ) - }); - }); - } -} - -pub struct TransactionCostMetricsService { - thread_hdl: JoinHandle<()>, -} - -impl TransactionCostMetricsService { - pub fn new(transaction_cost_metrics_receiver: Receiver) -> Self { - let thread_hdl = Builder::new() - .name("transaction_cost_metrics_service".to_string()) - .spawn(move || { - Self::service_loop(transaction_cost_metrics_receiver); - }) - .unwrap(); - - Self { thread_hdl } - } - - pub fn join(self) -> thread::Result<()> { - self.thread_hdl.join() - } - - fn service_loop(transaction_cost_metrics_receiver: Receiver) { - for tx_cost_metrics in transaction_cost_metrics_receiver.iter() { - match tx_cost_metrics { - TransactionCostMetrics::TransactionCostDetail { - slot, - tx_signature, - signature_cost, - write_lock_cost, - data_bytes_cost, - builtins_execution_cost, - bpf_execution_cost, - } => { - // report transaction cost details per slot|signature - datapoint_trace!( - "transaction-cost-details", - ("slot", slot as i64, i64), - ("tx_signature", tx_signature.to_string(), String), - ("signature_cost", signature_cost as i64, i64), - ("write_lock_cost", write_lock_cost as i64, i64), - ("data_bytes_cost", data_bytes_cost as i64, i64), - ( - "builtins_execution_cost", - builtins_execution_cost as i64, - i64 - ), - ("bpf_execution_cost", bpf_execution_cost as i64, i64), - ); - } - } - } - } -}