QosService inline metrics reporting (#32670)

This commit is contained in:
Andrew Fitzgerald 2023-08-01 10:19:02 -07:00 committed by GitHub
parent 7afefd9e1b
commit 727cca5d20
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 28 additions and 93 deletions

View File

@ -5,7 +5,6 @@
use {
super::{committer::CommitTransactionDetails, BatchedTransactionDetails},
crossbeam_channel::{unbounded, Receiver, Sender},
solana_cost_model::{cost_model::CostModel, transaction_cost::TransactionCost},
solana_measure::measure::Measure,
solana_runtime::bank::Bank,
@ -15,70 +14,25 @@ use {
saturating_add_assign,
transaction::{self, SanitizedTransaction, TransactionError},
},
std::{
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
},
thread::{self, Builder, JoinHandle},
time::Duration,
std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};
pub enum QosMetrics {
BlockBatchUpdate { slot: Slot },
}
// QosService is local to each banking thread, each instance of QosService provides services to
// one banking thread.
// It hosts a private thread for async metrics reporting, tagged with banking threads ID. Banking
// thread calls `report_metrics(slot)` at end of `process_and_record_transaction()`, or any time
// it wants, QosService sends `slot` to reporting thread via channel, signalling stats to be
// reported if new bank slot has changed.
// Banking thread calls `report_metrics(slot)` at end of `process_and_record_transaction()`, or any time
// it wants.
//
pub struct QosService {
// QosService hosts metrics object and a private reporting thread, as well as sender to
// communicate with thread.
report_sender: Sender<QosMetrics>,
metrics: Arc<QosServiceMetrics>,
// metrics reporting runs on a private thread
reporting_thread: Option<JoinHandle<()>>,
running_flag: Arc<AtomicBool>,
}
impl Drop for QosService {
fn drop(&mut self) {
self.running_flag.store(false, Ordering::Relaxed);
self.reporting_thread
.take()
.unwrap()
.join()
.expect("qos service metrics reporting thread failed to join");
}
metrics: QosServiceMetrics,
}
impl QosService {
pub fn new(id: u32) -> Self {
let (report_sender, report_receiver) = unbounded();
let running_flag = Arc::new(AtomicBool::new(true));
let metrics = Arc::new(QosServiceMetrics::new(id));
let running_flag_clone = Arc::clone(&running_flag);
let metrics_clone = Arc::clone(&metrics);
let reporting_thread = Some(
Builder::new()
.name("solQosSvcMetr".to_string())
.spawn(move || {
Self::reporting_loop(running_flag_clone, metrics_clone, report_receiver);
})
.unwrap(),
);
Self {
metrics,
reporting_thread,
running_flag,
report_sender,
metrics: QosServiceMetrics::new(id),
}
}
@ -266,9 +220,7 @@ impl QosService {
// metrics are reported by bank slot
pub fn report_metrics(&self, slot: Slot) {
self.report_sender
.send(QosMetrics::BlockBatchUpdate { slot })
.unwrap_or_else(|err| warn!("qos service report metrics failed: {:?}", err));
self.metrics.report(slot);
}
fn accumulate_estimated_transaction_costs(
@ -436,23 +388,6 @@ impl QosService {
});
batched_transaction_details
}
fn reporting_loop(
running_flag: Arc<AtomicBool>,
metrics: Arc<QosServiceMetrics>,
report_receiver: Receiver<QosMetrics>,
) {
while running_flag.load(Ordering::Relaxed) {
for qos_metrics in report_receiver.try_iter() {
match qos_metrics {
QosMetrics::BlockBatchUpdate { slot: bank_slot } => {
metrics.report(bank_slot);
}
}
}
thread::sleep(Duration::from_millis(100));
}
}
}
#[derive(Debug, Default)]
@ -541,109 +476,109 @@ impl QosServiceMetrics {
if bank_slot != self.slot.load(Ordering::Relaxed) {
datapoint_info!(
"qos-service-stats",
("id", self.id as i64, i64),
("bank_slot", bank_slot as i64, i64),
("id", self.id, i64),
("bank_slot", bank_slot, i64),
(
"compute_cost_time",
self.stats.compute_cost_time.swap(0, Ordering::Relaxed) as i64,
self.stats.compute_cost_time.swap(0, Ordering::Relaxed),
i64
),
(
"compute_cost_count",
self.stats.compute_cost_count.swap(0, Ordering::Relaxed) as i64,
self.stats.compute_cost_count.swap(0, Ordering::Relaxed),
i64
),
(
"cost_tracking_time",
self.stats.cost_tracking_time.swap(0, Ordering::Relaxed) as i64,
self.stats.cost_tracking_time.swap(0, Ordering::Relaxed),
i64
),
(
"selected_txs_count",
self.stats.selected_txs_count.swap(0, Ordering::Relaxed) as i64,
self.stats.selected_txs_count.swap(0, Ordering::Relaxed),
i64
),
(
"estimated_signature_cu",
self.stats.estimated_signature_cu.swap(0, Ordering::Relaxed) as i64,
self.stats.estimated_signature_cu.swap(0, Ordering::Relaxed),
i64
),
(
"estimated_write_lock_cu",
self.stats
.estimated_write_lock_cu
.swap(0, Ordering::Relaxed) as i64,
.swap(0, Ordering::Relaxed),
i64
),
(
"estimated_data_bytes_cu",
self.stats
.estimated_data_bytes_cu
.swap(0, Ordering::Relaxed) as i64,
.swap(0, Ordering::Relaxed),
i64
),
(
"estimated_builtins_execute_cu",
self.stats
.estimated_builtins_execute_cu
.swap(0, Ordering::Relaxed) as i64,
.swap(0, Ordering::Relaxed),
i64
),
(
"estimated_bpf_execute_cu",
self.stats
.estimated_bpf_execute_cu
.swap(0, Ordering::Relaxed) as i64,
.swap(0, Ordering::Relaxed),
i64
),
(
"actual_bpf_execute_cu",
self.stats.actual_bpf_execute_cu.swap(0, Ordering::Relaxed) as i64,
self.stats.actual_bpf_execute_cu.swap(0, Ordering::Relaxed),
i64
),
(
"actual_execute_time_us",
self.stats.actual_execute_time_us.swap(0, Ordering::Relaxed) as i64,
self.stats.actual_execute_time_us.swap(0, Ordering::Relaxed),
i64
),
);
datapoint_info!(
"qos-service-errors",
("id", self.id as i64, i64),
("bank_slot", bank_slot as i64, i64),
("id", self.id, i64),
("bank_slot", bank_slot, i64),
(
"retried_txs_per_block_limit_count",
self.errors
.retried_txs_per_block_limit_count
.swap(0, Ordering::Relaxed) as i64,
.swap(0, Ordering::Relaxed),
i64
),
(
"retried_txs_per_vote_limit_count",
self.errors
.retried_txs_per_vote_limit_count
.swap(0, Ordering::Relaxed) as i64,
.swap(0, Ordering::Relaxed),
i64
),
(
"retried_txs_per_account_limit_count",
self.errors
.retried_txs_per_account_limit_count
.swap(0, Ordering::Relaxed) as i64,
.swap(0, Ordering::Relaxed),
i64
),
(
"retried_txs_per_account_data_block_limit_count",
self.errors
.retried_txs_per_account_data_block_limit_count
.swap(0, Ordering::Relaxed) as i64,
.swap(0, Ordering::Relaxed),
i64
),
(
"dropped_txs_per_account_data_total_limit_count",
self.errors
.dropped_txs_per_account_data_total_limit_count
.swap(0, Ordering::Relaxed) as i64,
.swap(0, Ordering::Relaxed),
i64
),
);