Remove obsoleted metrics reporting to reduce lock contention on cost_model (#26608)

remove obsoleted metrics reporting to reduce lock contention on cost_model
This commit is contained in:
Tao Zhu 2022-07-14 23:02:49 -05:00 committed by GitHub
parent 0b78a213c3
commit f13b5c832d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 2 additions and 163 deletions

View File

@ -52,7 +52,6 @@ use {
bank::{Bank, NewBankOptions},
bank_forks::{BankForks, MAX_ROOT_DISTANCE_FOR_VOTE_ONLY},
commitment::BlockCommitmentCache,
transaction_cost_metrics_sender::TransactionCostMetricsSender,
vote_sender_types::ReplayVoteSender,
},
solana_sdk::{
@ -374,7 +373,6 @@ impl ReplayStage {
voting_sender: Sender<VoteOp>,
drop_bank_sender: Sender<Vec<Arc<Bank>>>,
block_metadata_notifier: Option<BlockMetadataNotifierLock>,
transaction_cost_metrics_sender: Option<TransactionCostMetricsSender>,
log_messages_bytes_limit: Option<usize>,
) -> Self {
let mut tower = if let Some(process_blockstore) = maybe_process_blockstore {
@ -506,7 +504,6 @@ impl ReplayStage {
&mut duplicate_slots_to_repair,
&ancestor_hashes_replay_update_sender,
block_metadata_notifier.clone(),
transaction_cost_metrics_sender.as_ref(),
&mut replay_timing,
log_messages_bytes_limit
);
@ -1690,7 +1687,6 @@ impl ReplayStage {
bank_progress: &mut ForkProgress,
transaction_status_sender: Option<&TransactionStatusSender>,
replay_vote_sender: &ReplayVoteSender,
transaction_cost_metrics_sender: Option<&TransactionCostMetricsSender>,
verify_recyclers: &VerifyRecyclers,
log_messages_bytes_limit: Option<usize>,
) -> result::Result<usize, BlockstoreProcessorError> {
@ -1706,7 +1702,6 @@ impl ReplayStage {
false,
transaction_status_sender,
Some(replay_vote_sender),
transaction_cost_metrics_sender,
None,
verify_recyclers,
false,
@ -2209,7 +2204,6 @@ impl ReplayStage {
duplicate_slots_to_repair: &mut DuplicateSlotsToRepair,
ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender,
block_metadata_notifier: Option<BlockMetadataNotifierLock>,
transaction_cost_metrics_sender: Option<&TransactionCostMetricsSender>,
replay_timing: &mut ReplayTiming,
log_messages_bytes_limit: Option<usize>,
) -> bool {
@ -2262,7 +2256,6 @@ impl ReplayStage {
bank_progress,
transaction_status_sender,
replay_vote_sender,
transaction_cost_metrics_sender,
verify_recyclers,
log_messages_bytes_limit,
);
@ -3883,7 +3876,6 @@ pub(crate) mod tests {
bank1_progress,
None,
&replay_vote_sender,
None,
&VerifyRecyclers::default(),
None,
);

View File

@ -41,13 +41,8 @@ use {
rpc_subscriptions::RpcSubscriptions,
},
solana_runtime::{
accounts_background_service::AbsRequestSender,
bank_forks::BankForks,
commitment::BlockCommitmentCache,
cost_model::CostModel,
transaction_cost_metrics_sender::{
TransactionCostMetricsSender, TransactionCostMetricsService,
},
accounts_background_service::AbsRequestSender, bank_forks::BankForks,
commitment::BlockCommitmentCache, cost_model::CostModel,
vote_sender_types::ReplayVoteSender,
},
solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Keypair},
@ -71,7 +66,6 @@ pub struct Tvu {
voting_service: VotingService,
warm_quic_cache_service: Option<WarmQuicCacheService>,
drop_bank_service: DropBankService,
transaction_cost_metrics_service: TransactionCostMetricsService,
}
pub struct TvuSockets {
@ -269,14 +263,6 @@ impl Tvu {
let (drop_bank_sender, drop_bank_receiver) = unbounded();
let (tx_cost_metrics_sender, tx_cost_metrics_receiver) = unbounded();
let transaction_cost_metrics_sender = Some(TransactionCostMetricsSender::new(
cost_model.clone(),
tx_cost_metrics_sender,
));
let transaction_cost_metrics_service =
TransactionCostMetricsService::new(tx_cost_metrics_receiver);
let drop_bank_service = DropBankService::new(drop_bank_receiver);
let replay_stage = ReplayStage::new(
@ -300,7 +286,6 @@ impl Tvu {
voting_sender,
drop_bank_sender,
block_metadata_notifier,
transaction_cost_metrics_sender,
log_messages_bytes_limit,
);
@ -327,7 +312,6 @@ impl Tvu {
voting_service,
warm_quic_cache_service,
drop_bank_service,
transaction_cost_metrics_service,
}
}
@ -347,7 +331,6 @@ impl Tvu {
warmup_service.join()?;
}
self.drop_bank_service.join()?;
self.transaction_cost_metrics_service.join()?;
Ok(())
}
}

View File

@ -32,7 +32,6 @@ use {
cost_model::CostModel,
runtime_config::RuntimeConfig,
transaction_batch::TransactionBatch,
transaction_cost_metrics_sender::TransactionCostMetricsSender,
vote_account::VoteAccountsHashMap,
vote_sender_types::ReplayVoteSender,
},
@ -524,7 +523,6 @@ pub fn process_entries_for_tests(
None,
transaction_status_sender,
replay_vote_sender,
None,
&mut confirmation_timing,
Arc::new(RwLock::new(BlockCostCapacityMeter::default())),
None,
@ -543,7 +541,6 @@ fn process_entries_with_callback(
entry_callback: Option<&ProcessCallback>,
transaction_status_sender: Option<&TransactionStatusSender>,
replay_vote_sender: Option<&ReplayVoteSender>,
transaction_cost_metrics_sender: Option<&TransactionCostMetricsSender>,
confirmation_timing: &mut ConfirmationTiming,
cost_capacity_meter: Arc<RwLock<BlockCostCapacityMeter>>,
log_messages_bytes_limit: Option<usize>,
@ -585,11 +582,6 @@ fn process_entries_with_callback(
}
}
EntryType::Transactions(transactions) => {
if let Some(transaction_cost_metrics_sender) = transaction_cost_metrics_sender {
transaction_cost_metrics_sender
.send_cost_details(bank.clone(), transactions.iter());
}
let starting_index = *starting_index;
let transaction_indexes = if randomize {
let mut transactions_and_indexes: Vec<(SanitizedTransaction, usize)> =
@ -973,7 +965,6 @@ fn confirm_full_slot(
skip_verification,
transaction_status_sender,
replay_vote_sender,
None,
opts.entry_callback.as_ref(),
recyclers,
opts.allow_dead_slots,
@ -1100,7 +1091,6 @@ pub fn confirm_slot(
skip_verification: bool,
transaction_status_sender: Option<&TransactionStatusSender>,
replay_vote_sender: Option<&ReplayVoteSender>,
transaction_cost_metrics_sender: Option<&TransactionCostMetricsSender>,
entry_callback: Option<&ProcessCallback>,
recyclers: &VerifyRecyclers,
allow_dead_slots: bool,
@ -1130,7 +1120,6 @@ pub fn confirm_slot(
skip_verification,
transaction_status_sender,
replay_vote_sender,
transaction_cost_metrics_sender,
entry_callback,
recyclers,
log_messages_bytes_limit,
@ -1146,7 +1135,6 @@ fn confirm_slot_entries(
skip_verification: bool,
transaction_status_sender: Option<&TransactionStatusSender>,
replay_vote_sender: Option<&ReplayVoteSender>,
transaction_cost_metrics_sender: Option<&TransactionCostMetricsSender>,
entry_callback: Option<&ProcessCallback>,
recyclers: &VerifyRecyclers,
log_messages_bytes_limit: Option<usize>,
@ -1248,7 +1236,6 @@ fn confirm_slot_entries(
entry_callback,
transaction_status_sender,
replay_vote_sender,
transaction_cost_metrics_sender,
timing,
cost_capacity_meter,
log_messages_bytes_limit,
@ -4128,7 +4115,6 @@ pub mod tests {
None,
None,
None,
None,
&VerifyRecyclers::default(),
None,
)
@ -4272,7 +4258,6 @@ pub mod tests {
Some(&transaction_status_sender),
None,
None,
None,
&VerifyRecyclers::default(),
None,
)
@ -4318,7 +4303,6 @@ pub mod tests {
Some(&transaction_status_sender),
None,
None,
None,
&VerifyRecyclers::default(),
None,
)

View File

@ -72,7 +72,6 @@ pub mod status_cache;
mod storable_accounts;
mod system_instruction_processor;
pub mod transaction_batch;
pub mod transaction_cost_metrics_sender;
pub mod transaction_error_metrics;
pub mod vote_account;
pub mod vote_parser;

View File

@ -1,119 +0,0 @@
use {
crate::{bank::Bank, cost_model::CostModel},
crossbeam_channel::{Receiver, Sender},
log::*,
solana_sdk::{clock::Slot, signature::Signature, transaction::SanitizedTransaction},
std::{
sync::{Arc, RwLock},
thread::{self, Builder, JoinHandle},
},
};
pub enum TransactionCostMetrics {
TransactionCostDetail {
slot: Slot,
tx_signature: Signature,
signature_cost: u64,
write_lock_cost: u64,
data_bytes_cost: u64,
builtins_execution_cost: u64,
bpf_execution_cost: u64,
},
}
pub struct TransactionCostMetricsSender {
cost_model: Arc<RwLock<CostModel>>,
metrics_sender: Sender<TransactionCostMetrics>,
}
impl TransactionCostMetricsSender {
pub fn new(
cost_model: Arc<RwLock<CostModel>>,
metrics_sender: Sender<TransactionCostMetrics>,
) -> Self {
Self {
cost_model,
metrics_sender,
}
}
pub fn send_cost_details<'a>(
&self,
bank: Arc<Bank>,
txs: impl Iterator<Item = &'a SanitizedTransaction>,
) {
let cost_model = self.cost_model.read().unwrap();
txs.for_each(|tx| {
let cost = cost_model.calculate_cost(tx);
self.metrics_sender
.send(TransactionCostMetrics::TransactionCostDetail {
slot: bank.slot(),
tx_signature: *tx.signature(),
signature_cost: cost.signature_cost,
write_lock_cost: cost.write_lock_cost,
data_bytes_cost: cost.data_bytes_cost,
builtins_execution_cost: cost.builtins_execution_cost,
bpf_execution_cost: cost.bpf_execution_cost,
})
.unwrap_or_else(|err| {
warn!(
"transaction cost metrics service report cost detail failed: {:?}",
err
)
});
});
}
}
pub struct TransactionCostMetricsService {
thread_hdl: JoinHandle<()>,
}
impl TransactionCostMetricsService {
pub fn new(transaction_cost_metrics_receiver: Receiver<TransactionCostMetrics>) -> Self {
let thread_hdl = Builder::new()
.name("transaction_cost_metrics_service".to_string())
.spawn(move || {
Self::service_loop(transaction_cost_metrics_receiver);
})
.unwrap();
Self { thread_hdl }
}
pub fn join(self) -> thread::Result<()> {
self.thread_hdl.join()
}
fn service_loop(transaction_cost_metrics_receiver: Receiver<TransactionCostMetrics>) {
for tx_cost_metrics in transaction_cost_metrics_receiver.iter() {
match tx_cost_metrics {
TransactionCostMetrics::TransactionCostDetail {
slot,
tx_signature,
signature_cost,
write_lock_cost,
data_bytes_cost,
builtins_execution_cost,
bpf_execution_cost,
} => {
// report transaction cost details per slot|signature
datapoint_trace!(
"transaction-cost-details",
("slot", slot as i64, i64),
("tx_signature", tx_signature.to_string(), String),
("signature_cost", signature_cost as i64, i64),
("write_lock_cost", write_lock_cost as i64, i64),
("data_bytes_cost", data_bytes_cost as i64, i64),
(
"builtins_execution_cost",
builtins_execution_cost as i64,
i64
),
("bpf_execution_cost", bpf_execution_cost as i64, i64),
);
}
}
}
}
}