Make thread id a tag instead of field in BankingStage metrics (#644)

Making the id a tag instead of a field will allow group-by operations on
id in chronograph
This commit is contained in:
steviez 2024-04-08 13:15:52 -05:00 committed by GitHub
parent 0af9aaa573
commit 191a997c5f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 55 additions and 49 deletions

View File

@ -88,7 +88,7 @@ const SLOT_BOUNDARY_CHECK_PERIOD: Duration = Duration::from_millis(10);
#[derive(Debug, Default)]
pub struct BankingStageStats {
last_report: AtomicInterval,
id: u32,
id: String,
receive_and_buffer_packets_count: AtomicUsize,
dropped_packets_count: AtomicUsize,
pub(crate) dropped_duplicated_packets_count: AtomicUsize,
@ -113,7 +113,7 @@ pub struct BankingStageStats {
impl BankingStageStats {
pub fn new(id: u32) -> Self {
BankingStageStats {
id,
id: id.to_string(),
batch_packet_indexes_len: Histogram::configure()
.max_value(PACKETS_PER_BATCH as u64)
.build()
@ -157,7 +157,7 @@ impl BankingStageStats {
if self.last_report.should_update(report_interval_ms) {
datapoint_info!(
"banking_stage-loop-stats",
("id", self.id, i64),
"id" => self.id,
(
"receive_and_buffer_packets_count",
self.receive_and_buffer_packets_count

View File

@ -169,7 +169,7 @@ fn try_drain_iter<T>(work: T, receiver: &Receiver<T>) -> impl Iterator<Item = T>
/// since the consume worker thread is sleeping unless there is work to be
/// done.
pub(crate) struct ConsumeWorkerMetrics {
id: u32,
id: String,
interval: AtomicInterval,
has_data: AtomicBool,
@ -185,15 +185,15 @@ impl ConsumeWorkerMetrics {
if self.interval.should_update(REPORT_INTERVAL_MS)
&& self.has_data.swap(false, Ordering::Relaxed)
{
self.count_metrics.report_and_reset(self.id);
self.timing_metrics.report_and_reset(self.id);
self.error_metrics.report_and_reset(self.id);
self.count_metrics.report_and_reset(&self.id);
self.timing_metrics.report_and_reset(&self.id);
self.error_metrics.report_and_reset(&self.id);
}
}
fn new(id: u32) -> Self {
Self {
id,
id: id.to_string(),
interval: AtomicInterval::default(),
has_data: AtomicBool::new(false),
count_metrics: ConsumeWorkerCountMetrics::default(),
@ -428,10 +428,10 @@ impl Default for ConsumeWorkerCountMetrics {
}
impl ConsumeWorkerCountMetrics {
fn report_and_reset(&self, id: u32) {
fn report_and_reset(&self, id: &str) {
datapoint_info!(
"banking_stage_worker_counts",
("id", id, i64),
"id" => id,
(
"transactions_attempted_execution_count",
self.transactions_attempted_execution_count
@ -495,10 +495,10 @@ struct ConsumeWorkerTimingMetrics {
}
impl ConsumeWorkerTimingMetrics {
fn report_and_reset(&self, id: u32) {
fn report_and_reset(&self, id: &str) {
datapoint_info!(
"banking_stage_worker_timing",
("id", id, i64),
"id" => id,
(
"cost_model_us",
self.cost_model_us.swap(0, Ordering::Relaxed),
@ -573,10 +573,10 @@ struct ConsumeWorkerTransactionErrorMetrics {
}
impl ConsumeWorkerTransactionErrorMetrics {
fn report_and_reset(&self, id: u32) {
fn report_and_reset(&self, id: &str) {
datapoint_info!(
"banking_stage_worker_error_metrics",
("id", id, i64),
"id" => id,
("total", self.total.swap(0, Ordering::Relaxed), i64),
(
"account_in_use",

View File

@ -84,10 +84,10 @@ impl LeaderPrioritizationFeesMetrics {
}
}
fn report(&self, id: u32, slot: Slot) {
fn report(&self, id: &str, slot: Slot) {
datapoint_info!(
"banking_stage-leader_prioritization_fees_info",
("id", id, i64),
"id" => id,
("slot", slot, i64),
(
"min_prioritization_fees_per_cu",
@ -199,10 +199,10 @@ impl LeaderSlotPacketCountMetrics {
Self::default()
}
fn report(&self, id: u32, slot: Slot) {
fn report(&self, id: &str, slot: Slot) {
datapoint_info!(
"banking_stage-leader_slot_packet_counts",
("id", id as i64, i64),
"id" => id,
("slot", slot as i64, i64),
(
"total_new_valid_packets",
@ -328,7 +328,7 @@ pub(crate) struct LeaderSlotMetrics {
// banking_stage creates one QosService instance per working threads, that is uniquely
// identified by id. This field allows to categorize metrics for gossip votes, TPU votes
// and other transactions.
id: u32,
id: String,
// aggregate metrics per slot
slot: Slot,
@ -355,7 +355,7 @@ impl LeaderSlotMetrics {
unprocessed_transaction_storage: Option<&UnprocessedTransactionStorage>,
) -> Self {
Self {
id,
id: id.to_string(),
slot,
packet_count_metrics: LeaderSlotPacketCountMetrics::new(),
transaction_error_metrics: TransactionErrorMetrics::new(),
@ -371,11 +371,11 @@ impl LeaderSlotMetrics {
pub(crate) fn report(&mut self) {
self.is_reported = true;
self.timing_metrics.report(self.id, self.slot);
self.transaction_error_metrics.report(self.id, self.slot);
self.packet_count_metrics.report(self.id, self.slot);
self.vote_packet_count_metrics.report(self.id, self.slot);
self.prioritization_fees_metric.report(self.id, self.slot);
self.timing_metrics.report(&self.id, self.slot);
self.transaction_error_metrics.report(&self.id, self.slot);
self.packet_count_metrics.report(&self.id, self.slot);
self.vote_packet_count_metrics.report(&self.id, self.slot);
self.prioritization_fees_metric.report(&self.id, self.slot);
}
/// Returns `Some(self.slot)` if the metrics have been reported, otherwise returns None
@ -408,10 +408,10 @@ impl VotePacketCountMetrics {
Self::default()
}
fn report(&self, id: u32, slot: Slot) {
fn report(&self, id: &str, slot: Slot) {
datapoint_info!(
"banking_stage-vote_packet_counts",
("id", id, i64),
"id" => id,
("slot", slot, i64),
("dropped_gossip_votes", self.dropped_gossip_votes, i64),
("dropped_tpu_votes", self.dropped_tpu_votes, i64)

View File

@ -32,10 +32,10 @@ impl LeaderExecuteAndCommitTimings {
self.execute_timings.accumulate(&other.execute_timings);
}
pub fn report(&self, id: u32, slot: Slot) {
pub fn report(&self, id: &str, slot: Slot) {
datapoint_info!(
"banking_stage-leader_slot_execute_and_commit_timings",
("id", id as i64, i64),
"id" => id,
("slot", slot as i64, i64),
("collect_balances_us", self.collect_balances_us as i64, i64),
("load_execute_us", self.load_execute_us as i64, i64),
@ -52,7 +52,7 @@ impl LeaderExecuteAndCommitTimings {
datapoint_info!(
"banking_stage-leader_slot_record_timings",
("id", id as i64, i64),
"id" => id,
("slot", slot as i64, i64),
(
"execution_results_to_transactions_us",
@ -96,7 +96,7 @@ impl LeaderSlotTimingMetrics {
}
}
pub(crate) fn report(&self, id: u32, slot: Slot) {
pub(crate) fn report(&self, id: &str, slot: Slot) {
self.outer_loop_timings.report(id, slot);
self.process_buffered_packets_timings.report(id, slot);
self.consume_buffered_packets_timings.report(id, slot);
@ -148,10 +148,10 @@ impl OuterLoopTimings {
self.bank_detected_time.elapsed().as_micros() as u64;
}
fn report(&self, id: u32, slot: Slot) {
fn report(&self, id: &str, slot: Slot) {
datapoint_info!(
"banking_stage-leader_slot_loop_timings",
("id", id as i64, i64),
"id" => id,
("slot", slot as i64, i64),
(
"bank_detected_to_slot_end_detected_us",
@ -191,10 +191,10 @@ pub(crate) struct ProcessBufferedPacketsTimings {
pub forward_and_hold_us: u64,
}
impl ProcessBufferedPacketsTimings {
fn report(&self, id: u32, slot: Slot) {
fn report(&self, id: &str, slot: Slot) {
datapoint_info!(
"banking_stage-leader_slot_process_buffered_packets_timings",
("id", id as i64, i64),
"id" => id,
("slot", slot as i64, i64),
("make_decision_us", self.make_decision_us as i64, i64),
(
@ -215,10 +215,10 @@ pub(crate) struct ConsumeBufferedPacketsTimings {
}
impl ConsumeBufferedPacketsTimings {
fn report(&self, id: u32, slot: Slot) {
fn report(&self, id: &str, slot: Slot) {
datapoint_info!(
"banking_stage-leader_slot_consume_buffered_packets_timings",
("id", id as i64, i64),
"id" => id,
("slot", slot as i64, i64),
(
"process_packets_transactions_us",
@ -247,10 +247,10 @@ pub(crate) struct ProcessPacketsTimings {
}
impl ProcessPacketsTimings {
fn report(&self, id: u32, slot: Slot) {
fn report(&self, id: &str, slot: Slot) {
datapoint_info!(
"banking_stage-leader_slot_process_packets_timings",
("id", id as i64, i64),
"id" => id,
("slot", slot as i64, i64),
(
"transactions_from_packets_us",

View File

@ -391,7 +391,7 @@ struct QosServiceMetrics {
/// banking_stage creates one QosService instance per working threads, that is uniquely
/// identified by id. This field allows to categorize metrics for gossip votes, TPU votes
/// and other transactions.
id: u32,
id: String,
/// aggregate metrics per slot
slot: AtomicU64,
@ -460,7 +460,7 @@ struct QosServiceMetricsErrors {
impl QosServiceMetrics {
pub fn new(id: u32) -> Self {
QosServiceMetrics {
id,
id: id.to_string(),
..QosServiceMetrics::default()
}
}
@ -469,7 +469,7 @@ impl QosServiceMetrics {
if bank_slot != self.slot.load(Ordering::Relaxed) {
datapoint_info!(
"qos-service-stats",
("id", self.id, i64),
"id" => self.id,
("bank_slot", bank_slot, i64),
(
"compute_cost_time",
@ -532,7 +532,7 @@ impl QosServiceMetrics {
);
datapoint_info!(
"qos-service-errors",
("id", self.id, i64),
"id" => self.id,
("bank_slot", bank_slot, i64),
(
"retried_txs_per_block_limit_count",

View File

@ -24,13 +24,20 @@ pub struct ModifiableTracerPacketStats {
#[derive(Debug, Default)]
pub struct TracerPacketStats {
id: u32,
id: String,
last_report: u64,
modifiable_tracer_packet_stats: Option<ModifiableTracerPacketStats>,
}
impl TracerPacketStats {
pub fn new(id: u32) -> Self {
Self {
id: id.to_string(),
..Self::default()
}
}
fn reset(id: String) -> Self {
Self {
id,
..Self::default()
@ -116,7 +123,7 @@ impl TracerPacketStats {
{
datapoint_info!(
"tracer-packet-stats",
("id", self.id, i64),
"id" => &self.id,
(
"total_removed_before_sigverify",
modifiable_tracer_packet_stats
@ -199,8 +206,7 @@ impl TracerPacketStats {
)
);
let id = self.id;
*self = Self::new(id);
*self = Self::reset(self.id.clone());
self.last_report = timestamp();
}
}

View File

@ -88,10 +88,10 @@ impl TransactionErrorMetrics {
);
}
pub fn report(&self, id: u32, slot: Slot) {
pub fn report(&self, id: &str, slot: Slot) {
datapoint_info!(
"banking_stage-leader_slot_transaction_errors",
("id", id as i64, i64),
"id" => id,
("slot", slot as i64, i64),
("total", self.total as i64, i64),
("account_in_use", self.account_in_use as i64, i64),