From a724fa2347a8ee19396b04cde5da24aac210c3b9 Mon Sep 17 00:00:00 2001 From: Tao Zhu <82401714+taozhu-chicago@users.noreply.github.com> Date: Fri, 14 Jan 2022 18:31:21 -0600 Subject: [PATCH] Add hidden cli option to allow validator reports replayed transaction cost metrics (#22369) * add hidden cli option to allow validator reports replayed transaction cost detail metrics * Update validator/src/main.rs Co-authored-by: Michael Vines * - rebase master, using unbounded instead of channel; dowgrade to datapoint_trace * removed cli arg, prefer log at trace Co-authored-by: Michael Vines --- core/src/replay_stage.rs | 8 ++ core/src/tvu.rs | 16 +++ ledger/src/blockstore_processor.rs | 11 ++ runtime/src/lib.rs | 1 + .../src/transaction_cost_metrics_sender.rs | 111 ++++++++++++++++++ 5 files changed, 147 insertions(+) create mode 100644 runtime/src/transaction_cost_metrics_sender.rs diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 7d9fb24c6..1bb1bbfe9 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -51,6 +51,7 @@ use { bank::{Bank, NewBankOptions}, bank_forks::BankForks, commitment::BlockCommitmentCache, + transaction_cost_metrics_sender::TransactionCostMetricsSender, vote_sender_types::ReplayVoteSender, }, solana_sdk::{ @@ -339,6 +340,7 @@ impl ReplayStage { voting_sender: Sender, drop_bank_sender: Sender>>, block_metadata_notifier: Option, + transaction_cost_metrics_sender: Option, ) -> Self { let ReplayStageConfig { vote_account, @@ -445,6 +447,7 @@ impl ReplayStage { &mut duplicate_slots_to_repair, &ancestor_hashes_replay_update_sender, block_metadata_notifier.clone(), + transaction_cost_metrics_sender.as_ref(), ); replay_active_banks_time.stop(); @@ -1570,6 +1573,7 @@ impl ReplayStage { bank_progress: &mut ForkProgress, transaction_status_sender: Option<&TransactionStatusSender>, replay_vote_sender: &ReplayVoteSender, + transaction_cost_metrics_sender: Option<&TransactionCostMetricsSender>, verify_recyclers: &VerifyRecyclers, ) -> result::Result { let tx_count_before = bank_progress.replay_progress.num_txs; @@ -1581,6 +1585,7 @@ impl ReplayStage { false, transaction_status_sender, Some(replay_vote_sender), + transaction_cost_metrics_sender, None, verify_recyclers, false, @@ -2070,6 +2075,7 @@ impl ReplayStage { duplicate_slots_to_repair: &mut DuplicateSlotsToRepair, ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender, block_metadata_notifier: Option, + transaction_cost_metrics_sender: Option<&TransactionCostMetricsSender>, ) -> bool { let mut did_complete_bank = false; let mut tx_count = 0; @@ -2119,6 +2125,7 @@ impl ReplayStage { bank_progress, transaction_status_sender, replay_vote_sender, + transaction_cost_metrics_sender, verify_recyclers, ); match replay_result { @@ -3652,6 +3659,7 @@ pub mod tests { bank1_progress, None, &replay_vote_sender, + None, &VerifyRecyclers::default(), ); let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); diff --git a/core/src/tvu.rs b/core/src/tvu.rs index b9029d3bc..119ccd6e6 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -49,6 +49,9 @@ use { snapshot_package::{ AccountsPackageReceiver, AccountsPackageSender, PendingSnapshotPackage, }, + transaction_cost_metrics_sender::{ + TransactionCostMetricsSender, TransactionCostMetricsService, + }, vote_sender_types::ReplayVoteSender, }, solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Keypair}, @@ -72,6 +75,7 @@ pub struct Tvu { cost_update_service: CostUpdateService, voting_service: VotingService, drop_bank_service: DropBankService, + transaction_cost_metrics_service: TransactionCostMetricsService, } pub struct TvuSockets { @@ -309,6 +313,15 @@ impl Tvu { ); let (drop_bank_sender, drop_bank_receiver) = unbounded(); + + let (tx_cost_metrics_sender, tx_cost_metrics_receiver) = unbounded(); + let transaction_cost_metrics_sender = Some(TransactionCostMetricsSender::new( + cost_model.clone(), + tx_cost_metrics_sender, + )); + let transaction_cost_metrics_service = + TransactionCostMetricsService::new(tx_cost_metrics_receiver); + let drop_bank_service = DropBankService::new(drop_bank_receiver); let replay_stage = ReplayStage::new( @@ -332,6 +345,7 @@ impl Tvu { voting_sender, drop_bank_sender, block_metadata_notifier, + transaction_cost_metrics_sender, ); let ledger_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| { @@ -366,6 +380,7 @@ impl Tvu { cost_update_service, voting_service, drop_bank_service, + transaction_cost_metrics_service, } } @@ -382,6 +397,7 @@ impl Tvu { self.cost_update_service.join()?; self.voting_service.join()?; self.drop_bank_service.join()?; + self.transaction_cost_metrics_service.join()?; Ok(()) } } diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 5a80e43ec..ba48d02c0 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -32,6 +32,7 @@ use { snapshot_package::{AccountsPackageSender, SnapshotType}, snapshot_utils::{self, BankFromArchiveTimings}, transaction_batch::TransactionBatch, + transaction_cost_metrics_sender::TransactionCostMetricsSender, vote_account::VoteAccount, vote_sender_types::ReplayVoteSender, }, @@ -317,6 +318,7 @@ pub fn process_entries_for_tests( None, transaction_status_sender, replay_vote_sender, + None, &mut timings, Arc::new(RwLock::new(BlockCostCapacityMeter::default())), ); @@ -333,6 +335,7 @@ fn process_entries_with_callback( entry_callback: Option<&ProcessCallback>, transaction_status_sender: Option<&TransactionStatusSender>, replay_vote_sender: Option<&ReplayVoteSender>, + transaction_cost_metrics_sender: Option<&TransactionCostMetricsSender>, timings: &mut ExecuteTimings, cost_capacity_meter: Arc>, ) -> Result<()> { @@ -366,6 +369,11 @@ fn process_entries_with_callback( } } EntryType::Transactions(transactions) => { + if let Some(transaction_cost_metrics_sender) = transaction_cost_metrics_sender { + transaction_cost_metrics_sender + .send_cost_details(bank.clone(), transactions.iter()); + } + if randomize { transactions.shuffle(&mut rng); } @@ -791,6 +799,7 @@ fn confirm_full_slot( skip_verification, transaction_status_sender, replay_vote_sender, + None, opts.entry_callback.as_ref(), recyclers, opts.allow_dead_slots, @@ -858,6 +867,7 @@ pub fn confirm_slot( skip_verification: bool, transaction_status_sender: Option<&TransactionStatusSender>, replay_vote_sender: Option<&ReplayVoteSender>, + transaction_cost_metrics_sender: Option<&TransactionCostMetricsSender>, entry_callback: Option<&ProcessCallback>, recyclers: &VerifyRecyclers, allow_dead_slots: bool, @@ -954,6 +964,7 @@ pub fn confirm_slot( entry_callback, transaction_status_sender, replay_vote_sender, + transaction_cost_metrics_sender, &mut execute_timings, cost_capacity_meter, ) diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index 96b8196f9..ab6be2d24 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -57,6 +57,7 @@ pub mod stakes; pub mod status_cache; mod system_instruction_processor; pub mod transaction_batch; +pub mod transaction_cost_metrics_sender; pub mod vote_account; pub mod vote_sender_types; pub mod waitable_condvar; diff --git a/runtime/src/transaction_cost_metrics_sender.rs b/runtime/src/transaction_cost_metrics_sender.rs new file mode 100644 index 000000000..af6dd203b --- /dev/null +++ b/runtime/src/transaction_cost_metrics_sender.rs @@ -0,0 +1,111 @@ +use { + crate::{bank::Bank, cost_model::CostModel}, + crossbeam_channel::{Receiver, Sender}, + log::*, + solana_sdk::{clock::Slot, signature::Signature, transaction::SanitizedTransaction}, + std::{ + sync::{Arc, RwLock}, + thread::{self, Builder, JoinHandle}, + }, +}; + +pub enum TransactionCostMetrics { + TransactionCostDetail { + slot: Slot, + tx_signature: Signature, + signature_cost: u64, + write_lock_cost: u64, + data_bytes_cost: u64, + execution_cost: u64, + }, +} + +pub struct TransactionCostMetricsSender { + cost_model: Arc>, + metrics_sender: Sender, +} + +impl TransactionCostMetricsSender { + pub fn new( + cost_model: Arc>, + metrics_sender: Sender, + ) -> Self { + Self { + cost_model, + metrics_sender, + } + } + + pub fn send_cost_details<'a>( + &self, + bank: Arc, + txs: impl Iterator, + ) { + let cost_model = self.cost_model.read().unwrap(); + txs.for_each(|tx| { + let cost = cost_model.calculate_cost(tx); + self.metrics_sender + .send(TransactionCostMetrics::TransactionCostDetail { + slot: bank.slot(), + tx_signature: *tx.signature(), + signature_cost: cost.signature_cost, + write_lock_cost: cost.write_lock_cost, + data_bytes_cost: cost.data_bytes_cost, + execution_cost: cost.execution_cost, + }) + .unwrap_or_else(|err| { + warn!( + "transaction cost metrics service report cost detail failed: {:?}", + err + ) + }); + }); + } +} + +pub struct TransactionCostMetricsService { + thread_hdl: JoinHandle<()>, +} + +impl TransactionCostMetricsService { + pub fn new(transaction_cost_metrics_receiver: Receiver) -> Self { + let thread_hdl = Builder::new() + .name("transaction_cost_metrics_service".to_string()) + .spawn(move || { + Self::service_loop(transaction_cost_metrics_receiver); + }) + .unwrap(); + + Self { thread_hdl } + } + + pub fn join(self) -> thread::Result<()> { + self.thread_hdl.join() + } + + fn service_loop(transaction_cost_metrics_receiver: Receiver) { + for tx_cost_metrics in transaction_cost_metrics_receiver.iter() { + match tx_cost_metrics { + TransactionCostMetrics::TransactionCostDetail { + slot, + tx_signature, + signature_cost, + write_lock_cost, + data_bytes_cost, + execution_cost, + } => { + // report transaction cost details per slot|signature + datapoint_trace!( + "transaction-cost-details", + ("slot", slot as i64, i64), + ("tx_signature", tx_signature.to_string(), String), + ("signature_cost", signature_cost as i64, i64), + ("write_lock_cost", write_lock_cost as i64, i64), + ("data_bytes_cost", data_bytes_cost as i64, i64), + ("execution_cost", execution_cost as i64, i64), + ); + } + } + } + } +}