From dd80a525ef524d35047c01a7e87e2576fa77e36c Mon Sep 17 00:00:00 2001 From: Tao Zhu <82401714+taozhu-chicago@users.noreply.github.com> Date: Wed, 22 Dec 2021 15:39:59 -0600 Subject: [PATCH] Leader QoS service metrics (#21708) * - qos_service metrics tagged with leader thread ids to separate gossip/tpu votes and transactions; - qos_service metrics is reported with bank slot; - replaced timer-based reporting with signal via channel; removed async report test as qos_service now lives within a thread * - add tpu live packets (eg, not buffered packets) states to qos metrics reporting --- core/benches/banking_stage.rs | 2 +- core/src/banking_stage.rs | 49 ++++--- core/src/qos_service.rs | 241 ++++++++++++++++++++-------------- 3 files changed, 174 insertions(+), 118 deletions(-) diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index dbb0961af1..ef87086318 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -97,7 +97,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) { None::>, &BankingStageStats::default(), &recorder, - &Arc::new(QosService::new(Arc::new(RwLock::new(CostModel::default())))), + &QosService::new(Arc::new(RwLock::new(CostModel::default())), 1), ); }); diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 5e02f8f815..1383710775 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -11,7 +11,7 @@ use { solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo}, solana_ledger::blockstore_processor::TransactionStatusSender, solana_measure::measure::Measure, - solana_metrics::{inc_new_counter_debug, inc_new_counter_info}, + solana_metrics::inc_new_counter_info, solana_perf::{ cuda_runtime::PinnedVec, data_budget::DataBudget, @@ -330,7 +330,6 @@ impl BankingStage { PacketHasher::default(), ))); let data_budget = Arc::new(DataBudget::default()); - let qos_service = Arc::new(QosService::new(cost_model)); // Many banks that process transactions in parallel. assert!(num_threads >= NUM_VOTE_PROCESSING_THREADS + MIN_THREADS_BANKING); let bank_thread_hdls: Vec> = (0..num_threads) @@ -355,7 +354,7 @@ impl BankingStage { let gossip_vote_sender = gossip_vote_sender.clone(); let duplicates = duplicates.clone(); let data_budget = data_budget.clone(); - let qos_service = qos_service.clone(); + let cost_model = cost_model.clone(); Builder::new() .name("solana-banking-stage-tx".to_string()) .spawn(move || { @@ -371,7 +370,7 @@ impl BankingStage { gossip_vote_sender, &duplicates, &data_budget, - qos_service, + cost_model, ); }) .unwrap() @@ -445,7 +444,7 @@ impl BankingStage { test_fn: Option, banking_stage_stats: &BankingStageStats, recorder: &TransactionRecorder, - qos_service: &Arc, + qos_service: &QosService, ) { let mut rebuffered_packet_count = 0; let mut new_tx_count = 0; @@ -502,6 +501,7 @@ impl BankingStage { )); } new_tx_count += processed; + // Out of the buffered packets just retried, collect any still unprocessed // transactions in this batch for forwarding rebuffered_packet_count += new_unprocessed_indexes.len(); @@ -594,7 +594,7 @@ impl BankingStage { banking_stage_stats: &BankingStageStats, recorder: &TransactionRecorder, data_budget: &DataBudget, - qos_service: &Arc, + qos_service: &QosService, ) -> BufferedPacketsDecision { let bank_start; let ( @@ -714,12 +714,13 @@ impl BankingStage { gossip_vote_sender: ReplayVoteSender, duplicates: &Arc, PacketHasher)>>, data_budget: &DataBudget, - qos_service: Arc, + cost_model: Arc>, ) { let recorder = poh_recorder.lock().unwrap().recorder(); let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut buffered_packet_batches = VecDeque::with_capacity(batch_limit); let banking_stage_stats = BankingStageStats::new(id); + let qos_service = QosService::new(cost_model, id); loop { let my_pubkey = cluster_info.id(); while !buffered_packet_batches.is_empty() { @@ -968,7 +969,7 @@ impl BankingStage { chunk_offset: usize, transaction_status_sender: Option, gossip_vote_sender: &ReplayVoteSender, - qos_service: &Arc, + qos_service: &QosService, ) -> (Result, Vec) { let tx_costs = qos_service.compute_transaction_costs(txs.iter()); @@ -999,6 +1000,9 @@ impl BankingStage { drop(batch); unlock_time.stop(); + // reports qos service stats for this batch + qos_service.report_metrics(bank.clone()); + debug!( "bank: {} lock: {}us unlock: {}us txs_len: {}", bank.slot(), @@ -1021,7 +1025,7 @@ impl BankingStage { poh: &TransactionRecorder, transaction_status_sender: Option, gossip_vote_sender: &ReplayVoteSender, - qos_service: &Arc, + qos_service: &QosService, ) -> (usize, Vec) { let mut chunk_start = 0; let mut unprocessed_txs = vec![]; @@ -1190,7 +1194,7 @@ impl BankingStage { transaction_status_sender: Option, gossip_vote_sender: &ReplayVoteSender, banking_stage_stats: &BankingStageStats, - qos_service: &Arc, + qos_service: &QosService, ) -> (usize, usize, Vec) { let mut packet_conversion_time = Measure::start("packet_conversion"); let (transactions, transaction_to_packet_indexes) = Self::transactions_from_packets( @@ -1330,7 +1334,7 @@ impl BankingStage { banking_stage_stats: &BankingStageStats, duplicates: &Arc, PacketHasher)>>, recorder: &TransactionRecorder, - qos_service: &Arc, + qos_service: &QosService, ) -> Result<(), RecvTimeoutError> { let mut recv_time = Measure::start("process_packets_recv"); let packet_batches = verified_receiver.recv_timeout(recv_timeout)?; @@ -1345,7 +1349,6 @@ impl BankingStage { packet_count, id, ); - inc_new_counter_debug!("banking_stage-transactions_received", packet_count); let mut proc_start = Measure::start("process_packets_transactions_process"); let mut new_tx_count = 0; @@ -1358,6 +1361,8 @@ impl BankingStage { let poh_recorder_bank = poh.lock().unwrap().get_poh_recorder_bank(); let working_bank_start = poh_recorder_bank.working_bank_start(); if PohRecorder::get_working_bank_if_not_expired(&working_bank_start).is_none() { + qos_service + .accumulate_tpu_buffered_packets_count(packet_batch.packets.len() as u64); Self::push_unprocessed( buffered_packet_batches, packet_batch, @@ -1378,6 +1383,7 @@ impl BankingStage { bank_creation_time, } = &*working_bank_start.unwrap(); + qos_service.accumulate_tpu_ingested_packets_count(packet_batch.packets.len() as u64); let (processed, verified_txs_len, unprocessed_indexes) = Self::process_packets_transactions( working_bank, @@ -1392,6 +1398,9 @@ impl BankingStage { ); new_tx_count += processed; + qos_service.accumulated_verified_txs_count(verified_txs_len as u64); + qos_service.accumulated_processed_txs_count(processed as u64); + qos_service.accumulated_retryable_txs_count(unprocessed_indexes.len() as u64); // Collect any unprocessed transactions in this batch for forwarding Self::push_unprocessed( @@ -2272,7 +2281,7 @@ mod tests { 0, None, &gossip_vote_sender, - &Arc::new(QosService::new(Arc::new(RwLock::new(CostModel::default())))), + &QosService::new(Arc::new(RwLock::new(CostModel::default())), 1), ) .0 .unwrap(); @@ -2314,7 +2323,7 @@ mod tests { 0, None, &gossip_vote_sender, - &Arc::new(QosService::new(Arc::new(RwLock::new(CostModel::default())))), + &QosService::new(Arc::new(RwLock::new(CostModel::default())), 1), ) .0, Err(PohRecorderError::MaxHeightReached) @@ -2402,7 +2411,7 @@ mod tests { 0, None, &gossip_vote_sender, - &Arc::new(QosService::new(Arc::new(RwLock::new(CostModel::default())))), + &QosService::new(Arc::new(RwLock::new(CostModel::default())), 1), ); poh_recorder @@ -2511,7 +2520,7 @@ mod tests { &recorder, None, &gossip_vote_sender, - &Arc::new(QosService::new(Arc::new(RwLock::new(CostModel::default())))), + &QosService::new(Arc::new(RwLock::new(CostModel::default())), 1), ); assert_eq!(processed_transactions_count, 0,); @@ -2606,7 +2615,7 @@ mod tests { enable_cpi_and_log_storage: false, }), &gossip_vote_sender, - &Arc::new(QosService::new(Arc::new(RwLock::new(CostModel::default())))), + &QosService::new(Arc::new(RwLock::new(CostModel::default())), 1), ); transaction_status_service.join().unwrap(); @@ -2737,7 +2746,7 @@ mod tests { None::>, &BankingStageStats::default(), &recorder, - &Arc::new(QosService::new(Arc::new(RwLock::new(CostModel::default())))), + &QosService::new(Arc::new(RwLock::new(CostModel::default())), 1), ); assert_eq!( buffered_packet_batches[0].1.len(), @@ -2757,7 +2766,7 @@ mod tests { None::>, &BankingStageStats::default(), &recorder, - &Arc::new(QosService::new(Arc::new(RwLock::new(CostModel::default())))), + &QosService::new(Arc::new(RwLock::new(CostModel::default())), 1), ); if num_expected_unprocessed == 0 { assert!(buffered_packet_batches.is_empty()) @@ -2823,7 +2832,7 @@ mod tests { test_fn, &BankingStageStats::default(), &recorder, - &Arc::new(QosService::new(Arc::new(RwLock::new(CostModel::default())))), + &QosService::new(Arc::new(RwLock::new(CostModel::default())), 1), ); // Check everything is correct. All indexes after `interrupted_iteration` diff --git a/core/src/qos_service.rs b/core/src/qos_service.rs index 9725ee2713..eef711d1db 100644 --- a/core/src/qos_service.rs +++ b/core/src/qos_service.rs @@ -10,12 +10,13 @@ use { cost_tracker::CostTrackerError, }, solana_sdk::{ - timing::AtomicInterval, + clock::Slot, transaction::{self, SanitizedTransaction, TransactionError}, }, std::{ sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, + mpsc::{channel, Receiver, Sender}, Arc, RwLock, }, thread::{self, Builder, JoinHandle}, @@ -23,13 +24,24 @@ use { }, }; +// QosService is local to each banking thread, each instance of QosService provides services to +// one banking thread. +// It hosts a private thread for async metrics reporting, tagged with banking thredas ID. Banking +// threda calls `report_metrics(&bank)` at end of `process_and_record_tramsaction()`, or any time +// it wants, QosService sends `&bank` to reporting thread via channel, signalling stats to be +// reported if new bank slot has changed. +// pub struct QosService { // cost_model instance is owned by validator, shared between replay_stage and // banking_stage. replay_stage writes the latest on-chain program timings to // it; banking_stage's qos_service reads that information to calculate // transaction cost, hence RwLock wrapped. cost_model: Arc>, + // QosService hosts metrics object and a private reporting thread, as well as sender to + // communicate with thread. + report_sender: Sender>, metrics: Arc, + // metrics reporting runs on a private thread reporting_thread: Option>, running_flag: Arc, } @@ -46,16 +58,10 @@ impl Drop for QosService { } impl QosService { - pub fn new(cost_model: Arc>) -> Self { - Self::new_with_reporting_duration(cost_model, 1000u64) - } - - pub fn new_with_reporting_duration( - cost_model: Arc>, - reporting_duration_ms: u64, - ) -> Self { + pub fn new(cost_model: Arc>, id: u32) -> Self { + let (report_sender, report_receiver) = channel(); let running_flag = Arc::new(AtomicBool::new(true)); - let metrics = Arc::new(QosServiceMetrics::default()); + let metrics = Arc::new(QosServiceMetrics::new(id)); let running_flag_clone = running_flag.clone(); let metrics_clone = metrics.clone(); @@ -63,18 +69,21 @@ impl QosService { Builder::new() .name("solana-qos-service-metrics-repoting".to_string()) .spawn(move || { - Self::reporting_loop(running_flag_clone, metrics_clone, reporting_duration_ms); + Self::reporting_loop(running_flag_clone, metrics_clone, report_receiver); }) .unwrap(), ); + Self { cost_model, metrics, reporting_thread, running_flag, + report_sender, } } + // invoke cost_model to calculate cost for the given list of transactions pub fn compute_transaction_costs<'a>( &self, transactions: impl Iterator, @@ -147,13 +156,53 @@ impl QosService { select_results } + // metrics are reported by bank slot + pub fn report_metrics(&self, bank: Arc) { + self.report_sender + .send(bank) + .unwrap_or_else(|err| warn!("qos service report metrics failed: {:?}", err)); + } + + // metrics accumulating apis + pub fn accumulate_tpu_ingested_packets_count(&self, count: u64) { + self.metrics + .tpu_ingested_packets_count + .fetch_add(count, Ordering::Relaxed); + } + + pub fn accumulate_tpu_buffered_packets_count(&self, count: u64) { + self.metrics + .tpu_buffered_packets_count + .fetch_add(count, Ordering::Relaxed); + } + + pub fn accumulated_verified_txs_count(&self, count: u64) { + self.metrics + .verified_txs_count + .fetch_add(count, Ordering::Relaxed); + } + + pub fn accumulated_processed_txs_count(&self, count: u64) { + self.metrics + .processed_txs_count + .fetch_add(count, Ordering::Relaxed); + } + + pub fn accumulated_retryable_txs_count(&self, count: u64) { + self.metrics + .retryable_txs_count + .fetch_add(count, Ordering::Relaxed); + } + fn reporting_loop( running_flag: Arc, metrics: Arc, - reporting_duration_ms: u64, + report_receiver: Receiver>, ) { while running_flag.load(Ordering::Relaxed) { - metrics.report(reporting_duration_ms); + for bank in report_receiver.try_iter() { + metrics.report(bank.slot()); + } thread::sleep(Duration::from_millis(100)); } } @@ -161,21 +210,97 @@ impl QosService { #[derive(Default)] struct QosServiceMetrics { - last_report: AtomicInterval, + // banking_stage creates one QosService instance per working threads, that is uniquely + // identified by id. This field allows to categorize metrics for gossip votes, TPU votes + // and other transactions. + id: u32, + + // aggregate metrics per slot + slot: AtomicU64, + + // accumulated number of live packets TPU received from verified receiver for processing. + tpu_ingested_packets_count: AtomicU64, + + // accumulated number of live packets TPU put into buffer due to no active bank. + tpu_buffered_packets_count: AtomicU64, + + // accumulated number of verified txs, which excludes unsanitized transactions and + // non-vote transactions when in vote-only mode from ingested packets + verified_txs_count: AtomicU64, + + // accumulated number of transactions been processed, includes those landed and those to be + // returned (due to AccountInUse, and other QoS related reasons) + processed_txs_count: AtomicU64, + + // accumulated number of transactions buffered for retry, often due to AccountInUse and QoS + // reasons, includes retried_txs_per_block_limit_count and retried_txs_per_account_limit_count + retryable_txs_count: AtomicU64, + + // accumulated time in micro-sec spent in computing transaction cost. It is the main performance + // overhead introduced by cost_model compute_cost_time: AtomicU64, + + // total nummber of transactions in the reporting period to be computed for theit cost. It is + // usually the number of sanitized transactions leader receives. compute_cost_count: AtomicU64, + + // acumulated time in micro-sec spent in tracking each bank's cost. It is the second part of + // overhead introduced cost_tracking_time: AtomicU64, + + // number of transactions to be included in blocks selected_txs_count: AtomicU64, + + // number of transactions to be queued for retry due to its potential to breach block limit retried_txs_per_block_limit_count: AtomicU64, + + // number of transactions to be queued for retry due to its potential to breach writable + // account limit retried_txs_per_account_limit_count: AtomicU64, + + // number of transactions to be queued for retry due to its account data limits retried_txs_per_account_data_limit_count: AtomicU64, } impl QosServiceMetrics { - pub fn report(&self, report_interval_ms: u64) { - if self.last_report.should_update(report_interval_ms) { + pub fn new(id: u32) -> Self { + QosServiceMetrics { + id, + ..QosServiceMetrics::default() + } + } + + pub fn report(&self, bank_slot: Slot) { + if bank_slot != self.slot.load(Ordering::Relaxed) { datapoint_info!( "qos-service-stats", + ("id", self.id as i64, i64), + ("bank_slot", bank_slot as i64, i64), + ( + "tpu_ingested_packets_count", + self.tpu_ingested_packets_count.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "tpu_buffered_packets_count", + self.tpu_buffered_packets_count.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "verified_txs_count", + self.verified_txs_count.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "processed_txs_count", + self.processed_txs_count.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "retryable_txs_count", + self.retryable_txs_count.swap(0, Ordering::Relaxed) as i64, + i64 + ), ( "compute_cost_time", self.compute_cost_time.swap(0, Ordering::Relaxed) as i64, @@ -215,6 +340,7 @@ impl QosServiceMetrics { i64 ), ); + self.slot.store(bank_slot, Ordering::Relaxed); } } } @@ -259,7 +385,7 @@ mod tests { let txs = vec![transfer_tx.clone(), vote_tx.clone(), vote_tx, transfer_tx]; let cost_model = Arc::new(RwLock::new(CostModel::default())); - let qos_service = QosService::new(cost_model.clone()); + let qos_service = QosService::new(cost_model.clone(), 1); let txs_costs = qos_service.compute_transaction_costs(txs.iter()); // verify the size of txs_costs and its contents @@ -307,7 +433,7 @@ mod tests { // make a vec of txs let txs = vec![transfer_tx.clone(), vote_tx.clone(), transfer_tx, vote_tx]; - let qos_service = QosService::new(cost_model); + let qos_service = QosService::new(cost_model, 1); let txs_costs = qos_service.compute_transaction_costs(txs.iter()); // set cost tracker limit to fit 1 transfer tx, vote tx bypasses limit check @@ -324,83 +450,4 @@ mod tests { assert!(results[2].is_err()); assert!(results[3].is_ok()); } - - #[test] - fn test_async_report_metrics() { - solana_logger::setup(); - //solana_logger::setup_with_default("solana=info"); - - // make a vec of txs - let txs_count = 128usize; - let keypair = Keypair::new(); - let transfer_tx = SanitizedTransaction::from_transaction_for_tests( - system_transaction::transfer(&keypair, &keypair.pubkey(), 1, Hash::default()), - ); - let mut txs_1 = Vec::with_capacity(txs_count); - let mut txs_2 = Vec::with_capacity(txs_count); - for _i in 0..txs_count { - txs_1.push(transfer_tx.clone()); - txs_2.push(transfer_tx.clone()); - } - - // set reporting duration to long enough so the stats wouldn't reset during testing - let ten_min = 600_000u64; - let cost_model = Arc::new(RwLock::new(CostModel::default())); - let qos_service = Arc::new(QosService::new_with_reporting_duration(cost_model, ten_min)); - let qos_service_1 = qos_service.clone(); - let qos_service_2 = qos_service.clone(); - - let th_1 = Builder::new() - .name("test-producer-1".to_string()) - .spawn(move || { - debug!("thread 1 starts with {} txs", txs_1.len()); - let tx_costs = qos_service_1.compute_transaction_costs(txs_1.iter()); - assert_eq!(txs_count, tx_costs.len()); - debug!( - "thread 1 done, generated {} count, see service count as {}", - txs_count, - qos_service_1 - .metrics - .compute_cost_count - .load(Ordering::Relaxed) - ); - }) - .unwrap(); - - let th_2 = Builder::new() - .name("test-producer-2".to_string()) - .spawn(move || { - debug!("thread 2 starts with {} txs", txs_2.len()); - let tx_costs = qos_service_2.compute_transaction_costs(txs_2.iter()); - assert_eq!(txs_count, tx_costs.len()); - debug!( - "thread 2 done, generated {} count, see service count as {}", - txs_count, - qos_service_2 - .metrics - .compute_cost_count - .load(Ordering::Relaxed) - ); - }) - .unwrap(); - - th_1.join().expect("qos service 1 panicked"); - th_2.join().expect("qos service 2 panicked"); - - debug!( - "all threads joined. count {}", - qos_service - .metrics - .compute_cost_count - .load(Ordering::Relaxed) - ); - - assert_eq!( - txs_count as u64 * 2, - qos_service - .metrics - .compute_cost_count - .load(Ordering::Relaxed) - ); - } }