BankingStage Refactor: transaction recorder record transactions (#30106)

This commit is contained in:
Andrew Fitzgerald 2023-02-09 08:34:02 -08:00 committed by GitHub
parent b3887af7c6
commit 058738424d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 84 additions and 77 deletions

View File

@ -15,9 +15,7 @@ use {
immutable_deserialized_packet::ImmutableDeserializedPacket, immutable_deserialized_packet::ImmutableDeserializedPacket,
latest_unprocessed_votes::{LatestUnprocessedVotes, VoteSource}, latest_unprocessed_votes::{LatestUnprocessedVotes, VoteSource},
leader_slot_banking_stage_metrics::{LeaderSlotMetricsTracker, ProcessTransactionsSummary}, leader_slot_banking_stage_metrics::{LeaderSlotMetricsTracker, ProcessTransactionsSummary},
leader_slot_banking_stage_timing_metrics::{ leader_slot_banking_stage_timing_metrics::LeaderExecuteAndCommitTimings,
LeaderExecuteAndCommitTimings, RecordTransactionsTimings,
},
qos_service::QosService, qos_service::QosService,
tracer_packet_stats::TracerPacketStats, tracer_packet_stats::TracerPacketStats,
unprocessed_packet_batches::*, unprocessed_packet_batches::*,
@ -29,7 +27,6 @@ use {
histogram::Histogram, histogram::Histogram,
itertools::Itertools, itertools::Itertools,
solana_client::connection_cache::ConnectionCache, solana_client::connection_cache::ConnectionCache,
solana_entry::entry::hash_transactions,
solana_gossip::cluster_info::ClusterInfo, solana_gossip::cluster_info::ClusterInfo,
solana_ledger::{ solana_ledger::{
blockstore_processor::TransactionStatusSender, token_balances::collect_token_balances, blockstore_processor::TransactionStatusSender, token_balances::collect_token_balances,
@ -37,7 +34,10 @@ use {
solana_measure::{measure, measure::Measure, measure_us}, solana_measure::{measure, measure::Measure, measure_us},
solana_metrics::inc_new_counter_info, solana_metrics::inc_new_counter_info,
solana_perf::{data_budget::DataBudget, packet::PACKETS_PER_BATCH}, solana_perf::{data_budget::DataBudget, packet::PACKETS_PER_BATCH},
solana_poh::poh_recorder::{BankStart, PohRecorder, PohRecorderError, TransactionRecorder}, solana_poh::poh_recorder::{
BankStart, PohRecorder, PohRecorderError, RecordTransactionsSummary,
RecordTransactionsTimings, TransactionRecorder,
},
solana_program_runtime::timings::ExecuteTimings, solana_program_runtime::timings::ExecuteTimings,
solana_runtime::{ solana_runtime::{
bank::{Bank, LoadAndExecuteTransactionsOutput, TransactionCheckResult}, bank::{Bank, LoadAndExecuteTransactionsOutput, TransactionCheckResult},
@ -47,11 +47,11 @@ use {
vote_sender_types::ReplayVoteSender, vote_sender_types::ReplayVoteSender,
}, },
solana_sdk::{ solana_sdk::{
clock::{Slot, FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, MAX_PROCESSING_AGE}, clock::{FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, MAX_PROCESSING_AGE},
feature_set::allow_votes_to_directly_update_vote_state, feature_set::allow_votes_to_directly_update_vote_state,
pubkey::Pubkey, pubkey::Pubkey,
timing::{timestamp, AtomicInterval}, timing::{timestamp, AtomicInterval},
transaction::{self, SanitizedTransaction, TransactionError, VersionedTransaction}, transaction::{self, SanitizedTransaction, TransactionError},
}, },
solana_transaction_status::TransactionTokenBalance, solana_transaction_status::TransactionTokenBalance,
std::{ std::{
@ -93,15 +93,6 @@ pub struct ProcessTransactionBatchOutput {
execute_and_commit_transactions_output: ExecuteAndCommitTransactionsOutput, execute_and_commit_transactions_output: ExecuteAndCommitTransactionsOutput,
} }
struct RecordTransactionsSummary {
// Metrics describing how time was spent recording transactions
record_transactions_timings: RecordTransactionsTimings,
// Result of trying to record the transactions into the PoH stream
result: Result<(), PohRecorderError>,
// Index in the slot of the first transaction recorded
starting_transaction_index: Option<usize>,
}
pub struct ExecuteAndCommitTransactionsOutput { pub struct ExecuteAndCommitTransactionsOutput {
// Total number of transactions that were passed as candidates for execution // Total number of transactions that were passed as candidates for execution
transactions_attempted_execution_count: usize, transactions_attempted_execution_count: usize,
@ -772,44 +763,6 @@ impl BankingStage {
) )
} }
fn record_transactions(
bank_slot: Slot,
transactions: Vec<VersionedTransaction>,
recorder: &TransactionRecorder,
) -> RecordTransactionsSummary {
let mut record_transactions_timings = RecordTransactionsTimings::default();
let mut starting_transaction_index = None;
if !transactions.is_empty() {
let (hash, hash_time) = measure!(hash_transactions(&transactions), "hash");
record_transactions_timings.hash_us = hash_time.as_us();
let (res, poh_record_time) =
measure!(recorder.record(bank_slot, hash, transactions), "hash");
record_transactions_timings.poh_record_us = poh_record_time.as_us();
match res {
Ok(starting_index) => {
starting_transaction_index = starting_index;
}
Err(PohRecorderError::MaxHeightReached) => {
return RecordTransactionsSummary {
record_transactions_timings,
result: Err(PohRecorderError::MaxHeightReached),
starting_transaction_index: None,
};
}
Err(e) => panic!("Poh recorder returned unexpected error: {e:?}"),
}
}
RecordTransactionsSummary {
record_transactions_timings,
result: Ok(()),
starting_transaction_index,
}
}
fn execute_and_commit_transactions_locked( fn execute_and_commit_transactions_locked(
bank: &Arc<Bank>, bank: &Arc<Bank>,
poh: &TransactionRecorder, poh: &TransactionRecorder,
@ -889,7 +842,7 @@ impl BankingStage {
); );
} }
let (record_transactions_summary, record_time) = measure!( let (record_transactions_summary, record_time) = measure!(
Self::record_transactions(bank.slot(), executed_transactions, poh), poh.record_transactions(bank.slot(), executed_transactions),
"record_transactions", "record_transactions",
); );
execute_and_commit_timings.record_us = record_time.as_us(); execute_and_commit_timings.record_us = record_time.as_us();
@ -1819,7 +1772,7 @@ mod tests {
system_transaction::transfer(&keypair2, &pubkey2, 1, genesis_config.hash()).into(), system_transaction::transfer(&keypair2, &pubkey2, 1, genesis_config.hash()).into(),
]; ];
let _ = BankingStage::record_transactions(bank.slot(), txs.clone(), &recorder); let _ = recorder.record_transactions(bank.slot(), txs.clone());
let (_bank, (entry, _tick_height)) = entry_receiver.recv().unwrap(); let (_bank, (entry, _tick_height)) = entry_receiver.recv().unwrap();
assert_eq!(entry.transactions, txs); assert_eq!(entry.transactions, txs);
@ -1827,7 +1780,7 @@ mod tests {
// record_transactions should throw MaxHeightReached // record_transactions should throw MaxHeightReached
let next_slot = bank.slot() + 1; let next_slot = bank.slot() + 1;
let RecordTransactionsSummary { result, .. } = let RecordTransactionsSummary { result, .. } =
BankingStage::record_transactions(next_slot, txs, &recorder); recorder.record_transactions(next_slot, txs);
assert_matches!(result, Err(PohRecorderError::MaxHeightReached)); assert_matches!(result, Err(PohRecorderError::MaxHeightReached));
// Should receive nothing from PohRecorder b/c record failed // Should receive nothing from PohRecorder b/c record failed
assert!(entry_receiver.try_recv().is_err()); assert!(entry_receiver.try_recv().is_err());

View File

@ -1,4 +1,5 @@
use { use {
solana_poh::poh_recorder::RecordTransactionsTimings,
solana_program_runtime::timings::ExecuteTimings, solana_program_runtime::timings::ExecuteTimings,
solana_sdk::{clock::Slot, saturating_add_assign}, solana_sdk::{clock::Slot, saturating_add_assign},
std::time::Instant, std::time::Instant,
@ -70,24 +71,6 @@ impl LeaderExecuteAndCommitTimings {
} }
} }
#[derive(Default, Debug)]
pub struct RecordTransactionsTimings {
pub execution_results_to_transactions_us: u64,
pub hash_us: u64,
pub poh_record_us: u64,
}
impl RecordTransactionsTimings {
pub fn accumulate(&mut self, other: &RecordTransactionsTimings) {
saturating_add_assign!(
self.execution_results_to_transactions_us,
other.execution_results_to_transactions_us
);
saturating_add_assign!(self.hash_us, other.hash_us);
saturating_add_assign!(self.poh_record_us, other.poh_record_us);
}
}
// Metrics capturing wallclock time spent in various parts of BankingStage during this // Metrics capturing wallclock time spent in various parts of BankingStage during this
// validator's leader slot // validator's leader slot
#[derive(Debug)] #[derive(Debug)]

View File

@ -15,7 +15,10 @@ use {
crate::poh_service::PohService, crate::poh_service::PohService,
crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, SendError, Sender, TrySendError}, crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, SendError, Sender, TrySendError},
log::*, log::*,
solana_entry::{entry::Entry, poh::Poh}, solana_entry::{
entry::{hash_transactions, Entry},
poh::Poh,
},
solana_ledger::{ solana_ledger::{
blockstore::Blockstore, blockstore::Blockstore,
genesis_utils::{create_genesis_config, GenesisConfigInfo}, genesis_utils::{create_genesis_config, GenesisConfigInfo},
@ -26,7 +29,7 @@ use {
solana_runtime::bank::Bank, solana_runtime::bank::Bank,
solana_sdk::{ solana_sdk::{
clock::NUM_CONSECUTIVE_LEADER_SLOTS, hash::Hash, poh_config::PohConfig, pubkey::Pubkey, clock::NUM_CONSECUTIVE_LEADER_SLOTS, hash::Hash, poh_config::PohConfig, pubkey::Pubkey,
transaction::VersionedTransaction, saturating_add_assign, transaction::VersionedTransaction,
}, },
std::{ std::{
cmp, cmp,
@ -110,6 +113,33 @@ impl Record {
} }
} }
#[derive(Default, Debug)]
pub struct RecordTransactionsTimings {
pub execution_results_to_transactions_us: u64,
pub hash_us: u64,
pub poh_record_us: u64,
}
impl RecordTransactionsTimings {
pub fn accumulate(&mut self, other: &RecordTransactionsTimings) {
saturating_add_assign!(
self.execution_results_to_transactions_us,
other.execution_results_to_transactions_us
);
saturating_add_assign!(self.hash_us, other.hash_us);
saturating_add_assign!(self.poh_record_us, other.poh_record_us);
}
}
pub struct RecordTransactionsSummary {
// Metrics describing how time was spent recording transactions
pub record_transactions_timings: RecordTransactionsTimings,
// Result of trying to record the transactions into the PoH stream
pub result: Result<()>,
// Index in the slot of the first transaction recorded
pub starting_transaction_index: Option<usize>,
}
pub struct TransactionRecorder { pub struct TransactionRecorder {
// shared by all users of PohRecorder // shared by all users of PohRecorder
pub record_sender: Sender<Record>, pub record_sender: Sender<Record>,
@ -131,6 +161,47 @@ impl TransactionRecorder {
is_exited, is_exited,
} }
} }
/// Hashes `transactions` and sends to PoH service for recording. Waits for response up to 1s.
/// Panics on unexpected (non-`MaxHeightReached`) errors.
pub fn record_transactions(
&self,
bank_slot: Slot,
transactions: Vec<VersionedTransaction>,
) -> RecordTransactionsSummary {
let mut record_transactions_timings = RecordTransactionsTimings::default();
let mut starting_transaction_index = None;
if !transactions.is_empty() {
let (hash, hash_time) = measure!(hash_transactions(&transactions), "hash");
record_transactions_timings.hash_us = hash_time.as_us();
let (res, poh_record_time) =
measure!(self.record(bank_slot, hash, transactions), "hash");
record_transactions_timings.poh_record_us = poh_record_time.as_us();
match res {
Ok(starting_index) => {
starting_transaction_index = starting_index;
}
Err(PohRecorderError::MaxHeightReached) => {
return RecordTransactionsSummary {
record_transactions_timings,
result: Err(PohRecorderError::MaxHeightReached),
starting_transaction_index: None,
};
}
Err(e) => panic!("Poh recorder returned unexpected error: {e:?}"),
}
}
RecordTransactionsSummary {
record_transactions_timings,
result: Ok(()),
starting_transaction_index,
}
}
// Returns the index of `transactions.first()` in the slot, if being tracked by WorkingBank // Returns the index of `transactions.first()` in the slot, if being tracked by WorkingBank
pub fn record( pub fn record(
&self, &self,