From 058738424d4cbc71babe56988af70406d4dea8a7 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Thu, 9 Feb 2023 08:34:02 -0800 Subject: [PATCH] BankingStage Refactor: transaction recorder record transactions (#30106) --- core/src/banking_stage.rs | 67 +++-------------- ...eader_slot_banking_stage_timing_metrics.rs | 19 +---- poh/src/poh_recorder.rs | 75 ++++++++++++++++++- 3 files changed, 84 insertions(+), 77 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 7b8516e51..0c3ababc7 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -15,9 +15,7 @@ use { immutable_deserialized_packet::ImmutableDeserializedPacket, latest_unprocessed_votes::{LatestUnprocessedVotes, VoteSource}, leader_slot_banking_stage_metrics::{LeaderSlotMetricsTracker, ProcessTransactionsSummary}, - leader_slot_banking_stage_timing_metrics::{ - LeaderExecuteAndCommitTimings, RecordTransactionsTimings, - }, + leader_slot_banking_stage_timing_metrics::LeaderExecuteAndCommitTimings, qos_service::QosService, tracer_packet_stats::TracerPacketStats, unprocessed_packet_batches::*, @@ -29,7 +27,6 @@ use { histogram::Histogram, itertools::Itertools, solana_client::connection_cache::ConnectionCache, - solana_entry::entry::hash_transactions, solana_gossip::cluster_info::ClusterInfo, solana_ledger::{ blockstore_processor::TransactionStatusSender, token_balances::collect_token_balances, @@ -37,7 +34,10 @@ use { solana_measure::{measure, measure::Measure, measure_us}, solana_metrics::inc_new_counter_info, solana_perf::{data_budget::DataBudget, packet::PACKETS_PER_BATCH}, - solana_poh::poh_recorder::{BankStart, PohRecorder, PohRecorderError, TransactionRecorder}, + solana_poh::poh_recorder::{ + BankStart, PohRecorder, PohRecorderError, RecordTransactionsSummary, + RecordTransactionsTimings, TransactionRecorder, + }, solana_program_runtime::timings::ExecuteTimings, solana_runtime::{ bank::{Bank, LoadAndExecuteTransactionsOutput, TransactionCheckResult}, @@ -47,11 +47,11 @@ use { vote_sender_types::ReplayVoteSender, }, solana_sdk::{ - clock::{Slot, FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, MAX_PROCESSING_AGE}, + clock::{FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, MAX_PROCESSING_AGE}, feature_set::allow_votes_to_directly_update_vote_state, pubkey::Pubkey, timing::{timestamp, AtomicInterval}, - transaction::{self, SanitizedTransaction, TransactionError, VersionedTransaction}, + transaction::{self, SanitizedTransaction, TransactionError}, }, solana_transaction_status::TransactionTokenBalance, std::{ @@ -93,15 +93,6 @@ pub struct ProcessTransactionBatchOutput { execute_and_commit_transactions_output: ExecuteAndCommitTransactionsOutput, } -struct RecordTransactionsSummary { - // Metrics describing how time was spent recording transactions - record_transactions_timings: RecordTransactionsTimings, - // Result of trying to record the transactions into the PoH stream - result: Result<(), PohRecorderError>, - // Index in the slot of the first transaction recorded - starting_transaction_index: Option, -} - pub struct ExecuteAndCommitTransactionsOutput { // Total number of transactions that were passed as candidates for execution transactions_attempted_execution_count: usize, @@ -772,44 +763,6 @@ impl BankingStage { ) } - fn record_transactions( - bank_slot: Slot, - transactions: Vec, - recorder: &TransactionRecorder, - ) -> RecordTransactionsSummary { - let mut record_transactions_timings = RecordTransactionsTimings::default(); - let mut starting_transaction_index = None; - - if !transactions.is_empty() { - let (hash, hash_time) = measure!(hash_transactions(&transactions), "hash"); - record_transactions_timings.hash_us = hash_time.as_us(); - - let (res, poh_record_time) = - measure!(recorder.record(bank_slot, hash, transactions), "hash"); - record_transactions_timings.poh_record_us = poh_record_time.as_us(); - - match res { - Ok(starting_index) => { - starting_transaction_index = starting_index; - } - Err(PohRecorderError::MaxHeightReached) => { - return RecordTransactionsSummary { - record_transactions_timings, - result: Err(PohRecorderError::MaxHeightReached), - starting_transaction_index: None, - }; - } - Err(e) => panic!("Poh recorder returned unexpected error: {e:?}"), - } - } - - RecordTransactionsSummary { - record_transactions_timings, - result: Ok(()), - starting_transaction_index, - } - } - fn execute_and_commit_transactions_locked( bank: &Arc, poh: &TransactionRecorder, @@ -889,7 +842,7 @@ impl BankingStage { ); } let (record_transactions_summary, record_time) = measure!( - Self::record_transactions(bank.slot(), executed_transactions, poh), + poh.record_transactions(bank.slot(), executed_transactions), "record_transactions", ); execute_and_commit_timings.record_us = record_time.as_us(); @@ -1819,7 +1772,7 @@ mod tests { system_transaction::transfer(&keypair2, &pubkey2, 1, genesis_config.hash()).into(), ]; - let _ = BankingStage::record_transactions(bank.slot(), txs.clone(), &recorder); + let _ = recorder.record_transactions(bank.slot(), txs.clone()); let (_bank, (entry, _tick_height)) = entry_receiver.recv().unwrap(); assert_eq!(entry.transactions, txs); @@ -1827,7 +1780,7 @@ mod tests { // record_transactions should throw MaxHeightReached let next_slot = bank.slot() + 1; let RecordTransactionsSummary { result, .. } = - BankingStage::record_transactions(next_slot, txs, &recorder); + recorder.record_transactions(next_slot, txs); assert_matches!(result, Err(PohRecorderError::MaxHeightReached)); // Should receive nothing from PohRecorder b/c record failed assert!(entry_receiver.try_recv().is_err()); diff --git a/core/src/leader_slot_banking_stage_timing_metrics.rs b/core/src/leader_slot_banking_stage_timing_metrics.rs index a0977b4ea..543b80b4a 100644 --- a/core/src/leader_slot_banking_stage_timing_metrics.rs +++ b/core/src/leader_slot_banking_stage_timing_metrics.rs @@ -1,4 +1,5 @@ use { + solana_poh::poh_recorder::RecordTransactionsTimings, solana_program_runtime::timings::ExecuteTimings, solana_sdk::{clock::Slot, saturating_add_assign}, std::time::Instant, @@ -70,24 +71,6 @@ impl LeaderExecuteAndCommitTimings { } } -#[derive(Default, Debug)] -pub struct RecordTransactionsTimings { - pub execution_results_to_transactions_us: u64, - pub hash_us: u64, - pub poh_record_us: u64, -} - -impl RecordTransactionsTimings { - pub fn accumulate(&mut self, other: &RecordTransactionsTimings) { - saturating_add_assign!( - self.execution_results_to_transactions_us, - other.execution_results_to_transactions_us - ); - saturating_add_assign!(self.hash_us, other.hash_us); - saturating_add_assign!(self.poh_record_us, other.poh_record_us); - } -} - // Metrics capturing wallclock time spent in various parts of BankingStage during this // validator's leader slot #[derive(Debug)] diff --git a/poh/src/poh_recorder.rs b/poh/src/poh_recorder.rs index df0b88833..0ad709ab9 100644 --- a/poh/src/poh_recorder.rs +++ b/poh/src/poh_recorder.rs @@ -15,7 +15,10 @@ use { crate::poh_service::PohService, crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, SendError, Sender, TrySendError}, log::*, - solana_entry::{entry::Entry, poh::Poh}, + solana_entry::{ + entry::{hash_transactions, Entry}, + poh::Poh, + }, solana_ledger::{ blockstore::Blockstore, genesis_utils::{create_genesis_config, GenesisConfigInfo}, @@ -26,7 +29,7 @@ use { solana_runtime::bank::Bank, solana_sdk::{ clock::NUM_CONSECUTIVE_LEADER_SLOTS, hash::Hash, poh_config::PohConfig, pubkey::Pubkey, - transaction::VersionedTransaction, + saturating_add_assign, transaction::VersionedTransaction, }, std::{ cmp, @@ -110,6 +113,33 @@ impl Record { } } +#[derive(Default, Debug)] +pub struct RecordTransactionsTimings { + pub execution_results_to_transactions_us: u64, + pub hash_us: u64, + pub poh_record_us: u64, +} + +impl RecordTransactionsTimings { + pub fn accumulate(&mut self, other: &RecordTransactionsTimings) { + saturating_add_assign!( + self.execution_results_to_transactions_us, + other.execution_results_to_transactions_us + ); + saturating_add_assign!(self.hash_us, other.hash_us); + saturating_add_assign!(self.poh_record_us, other.poh_record_us); + } +} + +pub struct RecordTransactionsSummary { + // Metrics describing how time was spent recording transactions + pub record_transactions_timings: RecordTransactionsTimings, + // Result of trying to record the transactions into the PoH stream + pub result: Result<()>, + // Index in the slot of the first transaction recorded + pub starting_transaction_index: Option, +} + pub struct TransactionRecorder { // shared by all users of PohRecorder pub record_sender: Sender, @@ -131,6 +161,47 @@ impl TransactionRecorder { is_exited, } } + + /// Hashes `transactions` and sends to PoH service for recording. Waits for response up to 1s. + /// Panics on unexpected (non-`MaxHeightReached`) errors. + pub fn record_transactions( + &self, + bank_slot: Slot, + transactions: Vec, + ) -> RecordTransactionsSummary { + let mut record_transactions_timings = RecordTransactionsTimings::default(); + let mut starting_transaction_index = None; + + if !transactions.is_empty() { + let (hash, hash_time) = measure!(hash_transactions(&transactions), "hash"); + record_transactions_timings.hash_us = hash_time.as_us(); + + let (res, poh_record_time) = + measure!(self.record(bank_slot, hash, transactions), "hash"); + record_transactions_timings.poh_record_us = poh_record_time.as_us(); + + match res { + Ok(starting_index) => { + starting_transaction_index = starting_index; + } + Err(PohRecorderError::MaxHeightReached) => { + return RecordTransactionsSummary { + record_transactions_timings, + result: Err(PohRecorderError::MaxHeightReached), + starting_transaction_index: None, + }; + } + Err(e) => panic!("Poh recorder returned unexpected error: {e:?}"), + } + } + + RecordTransactionsSummary { + record_transactions_timings, + result: Ok(()), + starting_transaction_index, + } + } + // Returns the index of `transactions.first()` in the slot, if being tracked by WorkingBank pub fn record( &self,