From 4f86c0b74a820cffd670f259ab5f9f9232896b9d Mon Sep 17 00:00:00 2001 From: Justin Starry Date: Thu, 8 Aug 2019 17:05:06 -0400 Subject: [PATCH] Rate limit transaction counters (#5447) * Rate limit transaction counters * @sakridge feedback * Set default high metrics rate for multinode demo * Fix tests * Swap defaults and fix env var tests * Only set metrics rate if not already set --- core/src/banking_stage.rs | 22 +-- core/src/cluster_info.rs | 2 +- core/src/poh_recorder.rs | 24 ++-- core/src/replay_stage.rs | 2 +- core/src/window_service.rs | 9 +- metrics/src/counter.rs | 134 ++++++++++++++++-- multinode-demo/common.sh | 4 + .../exchange_api/src/exchange_processor.rs | 6 +- runtime/src/bank.rs | 58 +++----- 9 files changed, 174 insertions(+), 87 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 9a0f5bbfaa..736dd3288c 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -228,10 +228,13 @@ impl BankingStage { (new_tx_count as f32) / (proc_start.as_s()) ); - inc_new_counter_info!("banking_stage-rebuffered_packets", rebuffered_packets); - inc_new_counter_info!("banking_stage-consumed_buffered_packets", new_tx_count); - inc_new_counter_debug!("banking_stage-process_transactions", new_tx_count); - inc_new_counter_debug!("banking_stage-dropped_batches_count", dropped_batches_count); + inc_new_high_rate_counter_info!("banking_stage-rebuffered_packets", rebuffered_packets); + inc_new_high_rate_counter_info!("banking_stage-consumed_buffered_packets", new_tx_count); + inc_new_high_rate_counter_debug!("banking_stage-process_transactions", new_tx_count); + inc_new_high_rate_counter_debug!( + "banking_stage-dropped_batches_count", + dropped_batches_count + ); Ok(unprocessed_packets) } @@ -810,7 +813,7 @@ impl BankingStage { count, id, ); - inc_new_counter_debug!("banking_stage-transactions_received", count); + inc_new_high_rate_counter_debug!("banking_stage-transactions_received", count); let mut proc_start = Measure::start("process_received_packets_process"); let mut new_tx_count = 0; @@ -882,9 +885,12 @@ impl BankingStage { count, id, ); - inc_new_counter_debug!("banking_stage-process_packets", count); - inc_new_counter_debug!("banking_stage-process_transactions", new_tx_count); - inc_new_counter_debug!("banking_stage-dropped_batches_count", dropped_batches_count); + inc_new_high_rate_counter_debug!("banking_stage-process_packets", count); + inc_new_high_rate_counter_debug!("banking_stage-process_transactions", new_tx_count); + inc_new_high_rate_counter_debug!( + "banking_stage-dropped_batches_count", + dropped_batches_count + ); *recv_start = Instant::now(); diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 0442800170..eb5bf10893 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -1155,7 +1155,7 @@ impl ClusterInfo { stakes: &HashMap, ) -> Vec { let self_id = me.read().unwrap().gossip.id; - inc_new_counter_debug!("cluster_info-push_message", 1, 0, 1000); + inc_new_high_rate_counter_debug!("cluster_info-push_message", 1); let updated: Vec<_> = me.write() diff --git a/core/src/poh_recorder.rs b/core/src/poh_recorder.rs index aa15367779..5cb5bfdcfe 100644 --- a/core/src/poh_recorder.rs +++ b/core/src/poh_recorder.rs @@ -296,11 +296,9 @@ impl PohRecorder { pub fn tick(&mut self) { let now = Instant::now(); let poh_entry = self.poh.lock().unwrap().tick(); - inc_new_counter_warn!( + inc_new_high_rate_counter_warn!( "poh_recorder-tick_lock_contention", - timing::duration_as_ms(&now.elapsed()) as usize, - 0, - 1000 + timing::duration_as_ms(&now.elapsed()) as usize ); let now = Instant::now(); if let Some(poh_entry) = poh_entry { @@ -308,11 +306,9 @@ impl PohRecorder { trace!("tick {}", self.tick_height); if self.start_leader_at_tick.is_none() { - inc_new_counter_warn!( + inc_new_high_rate_counter_warn!( "poh_recorder-tick_overhead", - timing::duration_as_ms(&now.elapsed()) as usize, - 0, - 1000 + timing::duration_as_ms(&now.elapsed()) as usize ); return; } @@ -326,11 +322,9 @@ impl PohRecorder { self.tick_cache.push((entry, self.tick_height)); let _ = self.flush_cache(true); } - inc_new_counter_warn!( + inc_new_high_rate_counter_warn!( "poh_recorder-tick_overhead", - timing::duration_as_ms(&now.elapsed()) as usize, - 0, - 1000 + timing::duration_as_ms(&now.elapsed()) as usize ); } @@ -356,11 +350,9 @@ impl PohRecorder { let now = Instant::now(); if let Some(poh_entry) = self.poh.lock().unwrap().record(mixin) { - inc_new_counter_warn!( + inc_new_high_rate_counter_warn!( "poh_recorder-record_lock_contention", - timing::duration_as_ms(&now.elapsed()) as usize, - 0, - 1000 + timing::duration_as_ms(&now.elapsed()) as usize ); let entry = Entry { num_hashes: poh_entry.num_hashes, diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 8232df0b8d..063812f1a4 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -556,7 +556,7 @@ impl ReplayStage { ); } } - inc_new_counter_info!("replay_stage-replay_transactions", tx_count); + inc_new_high_rate_counter_info!("replay_stage-replay_transactions", tx_count); did_complete_bank } diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 55514273e1..c9cf041a97 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -39,12 +39,7 @@ fn retransmit_blobs(blobs: &[SharedBlob], retransmit: &BlobSender, id: &Pubkey) } if !retransmit_queue.is_empty() { - inc_new_counter_debug!( - "streamer-recv_window-retransmit", - retransmit_queue.len(), - 0, - 1000 - ); + inc_new_high_rate_counter_debug!("streamer-recv_window-retransmit", retransmit_queue.len()); retransmit.send(retransmit_queue)?; } Ok(()) @@ -118,7 +113,7 @@ where blobs.append(&mut blob) } let now = Instant::now(); - inc_new_counter_debug!("streamer-recv_window-recv", blobs.len(), 0, 1000); + inc_new_high_rate_counter_debug!("streamer-recv_window-recv", blobs.len()); let blobs: Vec<_> = thread_pool.install(|| { blobs diff --git a/metrics/src/counter.rs b/metrics/src/counter.rs index 1f06db1ca6..d91278ac9b 100644 --- a/metrics/src/counter.rs +++ b/metrics/src/counter.rs @@ -6,6 +6,10 @@ use std::sync::atomic::{AtomicUsize, Ordering}; const DEFAULT_LOG_RATE: usize = 1000; const DEFAULT_METRICS_RATE: usize = 1; +const DEFAULT_METRICS_HIGH_RATE: usize = 10; + +/// Use default metrics high rate +pub const HIGH_RATE: usize = 999_999; pub struct Counter { pub name: &'static str, @@ -125,7 +129,69 @@ macro_rules! inc_new_counter_debug { }}; } +#[macro_export] +macro_rules! inc_new_high_rate_counter_error { + ($name:expr, $count:expr) => {{ + inc_new_counter!( + $name, + $count, + log::Level::Error, + 0, + $crate::counter::HIGH_RATE + ); + }}; +} + +#[macro_export] +macro_rules! inc_new_high_rate_counter_warn { + ($name:expr, $count:expr) => {{ + inc_new_counter!( + $name, + $count, + log::Level::Warn, + 0, + $crate::counter::HIGH_RATE + ); + }}; +} + +#[macro_export] +macro_rules! inc_new_high_rate_counter_info { + ($name:expr, $count:expr) => {{ + inc_new_counter!( + $name, + $count, + log::Level::Info, + 0, + $crate::counter::HIGH_RATE + ); + }}; +} + +#[macro_export] +macro_rules! inc_new_high_rate_counter_debug { + ($name:expr, $count:expr) => {{ + inc_new_counter!( + $name, + $count, + log::Level::Debug, + 0, + $crate::counter::HIGH_RATE + ); + }}; +} + impl Counter { + fn default_metrics_high_rate() -> usize { + let v = env::var("SOLANA_METRICS_HIGH_RATE") + .map(|x| x.parse().unwrap_or(0)) + .unwrap_or(0); + if v == 0 { + DEFAULT_METRICS_HIGH_RATE + } else { + v + } + } fn default_log_rate() -> usize { let v = env::var("SOLANA_DEFAULT_LOG_RATE") .map(|x| x.parse().unwrap_or(DEFAULT_LOG_RATE)) @@ -142,20 +208,22 @@ impl Counter { .add_field("count", influxdb::Value::Integer(0)) .to_owned(), ); + self.lograte + .compare_and_swap(0, Self::default_log_rate(), Ordering::Relaxed); + self.metricsrate.compare_and_swap( + HIGH_RATE, + Self::default_metrics_high_rate(), + Ordering::Relaxed, + ); + self.metricsrate + .compare_and_swap(0, DEFAULT_METRICS_RATE, Ordering::Relaxed); } pub fn inc(&mut self, level: log::Level, events: usize) { let counts = self.counts.fetch_add(events, Ordering::Relaxed); let times = self.times.fetch_add(1, Ordering::Relaxed); - let mut lograte = self.lograte.load(Ordering::Relaxed); - if lograte == 0 { - lograte = Self::default_log_rate(); - self.lograte.store(lograte, Ordering::Relaxed); - } - let mut metricsrate = self.metricsrate.load(Ordering::Relaxed); - if metricsrate == 0 { - metricsrate = DEFAULT_METRICS_RATE; - self.metricsrate.store(metricsrate, Ordering::Relaxed); - } + let lograte = self.lograte.load(Ordering::Relaxed); + let metricsrate = self.metricsrate.load(Ordering::Relaxed); + if times % lograte == 0 && times > 0 && log_enabled!(level) { log!(level, "COUNTER:{{\"name\": \"{}\", \"counts\": {}, \"samples\": {}, \"now\": {}, \"events\": {}}}", @@ -191,7 +259,7 @@ impl Counter { } #[cfg(test)] mod tests { - use crate::counter::{Counter, DEFAULT_LOG_RATE}; + use crate::counter::{Counter, DEFAULT_LOG_RATE, DEFAULT_METRICS_HIGH_RATE, HIGH_RATE}; use log::Level; use log::*; use serial_test_derive::serial; @@ -219,6 +287,9 @@ mod tests { .ok(); let _readlock = get_env_lock().read(); static mut COUNTER: Counter = create_counter!("test", 1000, 1); + unsafe { + COUNTER.init(); + } let count = 1; inc_counter!(COUNTER, Level::Info, count); unsafe { @@ -239,6 +310,40 @@ mod tests { assert_eq!(COUNTER.lastlog.load(Ordering::Relaxed), 399); } } + + #[test] + #[serial] + fn test_high_rate_counter() { + env_logger::Builder::from_env(env_logger::Env::new().default_filter_or("solana=info")) + .try_init() + .ok(); + let _readlock = get_env_lock().read(); + static mut COUNTER: Counter = create_counter!("test", 1000, HIGH_RATE); + env::remove_var("SOLANA_METRICS_HIGH_RATE"); + unsafe { + COUNTER.init(); + assert_eq!( + COUNTER.metricsrate.load(Ordering::Relaxed), + DEFAULT_METRICS_HIGH_RATE + ); + } + } + + #[test] + #[serial] + fn test_high_rate_counter_env() { + env_logger::Builder::from_env(env_logger::Env::new().default_filter_or("solana=info")) + .try_init() + .ok(); + let _writelock = get_env_lock().write(); + static mut COUNTER: Counter = create_counter!("test", 1000, HIGH_RATE); + env::set_var("SOLANA_METRICS_HIGH_RATE", "50"); + unsafe { + COUNTER.init(); + assert_eq!(COUNTER.metricsrate.load(Ordering::Relaxed), 50); + } + } + #[test] #[serial] fn test_inc_new_counter() { @@ -249,6 +354,7 @@ mod tests { inc_new_counter_info!("2", 1, 3); inc_new_counter_info!("3", 1, 2, 1); } + #[test] #[serial] fn test_lograte() { @@ -264,8 +370,8 @@ mod tests { DEFAULT_LOG_RATE, ); static mut COUNTER: Counter = create_counter!("test_lograte", 0, 1); - inc_counter!(COUNTER, Level::Error, 2); unsafe { + COUNTER.init(); assert_eq!(COUNTER.lograte.load(Ordering::Relaxed), DEFAULT_LOG_RATE); } } @@ -280,15 +386,15 @@ mod tests { let _writelock = get_env_lock().write(); static mut COUNTER: Counter = create_counter!("test_lograte_env", 0, 1); env::set_var("SOLANA_DEFAULT_LOG_RATE", "50"); - inc_counter!(COUNTER, Level::Error, 2); unsafe { + COUNTER.init(); assert_eq!(COUNTER.lograte.load(Ordering::Relaxed), 50); } static mut COUNTER2: Counter = create_counter!("test_lograte_env", 0, 1); env::set_var("SOLANA_DEFAULT_LOG_RATE", "0"); - inc_counter!(COUNTER2, Level::Error, 2); unsafe { + COUNTER2.init(); assert_eq!(COUNTER2.lograte.load(Ordering::Relaxed), DEFAULT_LOG_RATE); } } diff --git a/multinode-demo/common.sh b/multinode-demo/common.sh index b1a0de5908..78cae245e9 100644 --- a/multinode-demo/common.sh +++ b/multinode-demo/common.sh @@ -60,6 +60,10 @@ solana_replicator=$(solana_program replicator) export RUST_BACKTRACE=1 +if [[ -z $SOLANA_METRICS_HIGH_RATE ]]; then + export SOLANA_METRICS_HIGH_RATE=1000 +fi + # shellcheck source=scripts/configure-metrics.sh source "$SOLANA_ROOT"/scripts/configure-metrics.sh diff --git a/programs/exchange_api/src/exchange_processor.rs b/programs/exchange_api/src/exchange_processor.rs index e0acdb6634..c15635cc23 100644 --- a/programs/exchange_api/src/exchange_processor.rs +++ b/programs/exchange_api/src/exchange_processor.rs @@ -4,7 +4,7 @@ use crate::exchange_instruction::*; use crate::exchange_state::*; use crate::faucet; use log::*; -use solana_metrics::inc_new_counter_info; +use solana_metrics::inc_new_high_rate_counter_info; use solana_sdk::account::KeyedAccount; use solana_sdk::instruction::InstructionError; use solana_sdk::pubkey::Pubkey; @@ -296,7 +296,7 @@ impl ExchangeProcessor { // Trade holds the tokens in escrow account.tokens[from_token] -= info.tokens; - inc_new_counter_info!("exchange_processor-trades", 1, 1000, 1000); + inc_new_high_rate_counter_info!("exchange_processor-trades", 1); Self::serialize( &ExchangeState::Trade(OrderInfo { @@ -390,7 +390,7 @@ impl ExchangeProcessor { Err(e)? } - inc_new_counter_info!("exchange_processor-swaps", 1, 1000, 1000); + inc_new_high_rate_counter_info!("exchange_processor-swaps", 1); if to_order.tokens == 0 { // Turn into token account diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index c1dd844a5a..0adfdd5ab3 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -25,7 +25,7 @@ use log::*; use serde::{Deserialize, Serialize}; use solana_measure::measure::Measure; use solana_metrics::{ - datapoint_info, inc_new_counter_debug, inc_new_counter_error, inc_new_counter_info, + datapoint_info, inc_new_counter_debug, inc_new_counter_error, inc_new_high_rate_counter_info, }; use solana_sdk::account::Account; use solana_sdk::fee_calculator::FeeCalculator; @@ -856,59 +856,45 @@ impl Bank { fn update_error_counters(error_counters: &ErrorCounters) { if 0 != error_counters.blockhash_not_found { - inc_new_counter_error!( + inc_new_high_rate_counter_error!( "bank-process_transactions-error-blockhash_not_found", - error_counters.blockhash_not_found, - 0, - 1000 + error_counters.blockhash_not_found ); } if 0 != error_counters.invalid_account_index { - inc_new_counter_error!( + inc_new_high_rate_counter_error!( "bank-process_transactions-error-invalid_account_index", - error_counters.invalid_account_index, - 0, - 1000 + error_counters.invalid_account_index ); } if 0 != error_counters.reserve_blockhash { - inc_new_counter_error!( + inc_new_high_rate_counter_error!( "bank-process_transactions-error-reserve_blockhash", - error_counters.reserve_blockhash, - 0, - 1000 + error_counters.reserve_blockhash ); } if 0 != error_counters.duplicate_signature { - inc_new_counter_error!( + inc_new_high_rate_counter_error!( "bank-process_transactions-error-duplicate_signature", - error_counters.duplicate_signature, - 0, - 1000 + error_counters.duplicate_signature ); } if 0 != error_counters.invalid_account_for_fee { - inc_new_counter_error!( + inc_new_high_rate_counter_error!( "bank-process_transactions-error-invalid_account_for_fee", - error_counters.invalid_account_for_fee, - 0, - 1000 + error_counters.invalid_account_for_fee ); } if 0 != error_counters.insufficient_funds { - inc_new_counter_error!( + inc_new_high_rate_counter_error!( "bank-process_transactions-error-insufficient_funds", - error_counters.insufficient_funds, - 0, - 1000 + error_counters.insufficient_funds ); } if 0 != error_counters.account_loaded_twice { - inc_new_counter_error!( + inc_new_high_rate_counter_error!( "bank-process_transactions-account_loaded_twice", - error_counters.account_loaded_twice, - 0, - 1000 + error_counters.account_loaded_twice ); } } @@ -927,7 +913,7 @@ impl Bank { usize, ) { debug!("processing transactions: {}", txs.len()); - inc_new_counter_info!("bank-process_transactions", txs.len()); + inc_new_high_rate_counter_info!("bank-process_transactions", txs.len()); let mut error_counters = ErrorCounters::default(); let mut load_time = Measure::start("accounts_load"); @@ -988,13 +974,11 @@ impl Bank { } if err_count > 0 { debug!("{} errors of {} txs", err_count, err_count + tx_count); - inc_new_counter_error!( + inc_new_high_rate_counter_error!( "bank-process_transactions-account_not_found", - error_counters.account_not_found, - 0, - 1000 + error_counters.account_not_found ); - inc_new_counter_error!("bank-process_transactions-error_count", err_count, 0, 1000); + inc_new_counter_error!("bank-process_transactions-error_count", err_count); } Self::update_error_counters(&error_counters); @@ -1066,8 +1050,8 @@ impl Bank { self.increment_transaction_count(tx_count); self.increment_signature_count(signature_count); - inc_new_counter_info!("bank-process_transactions-txs", tx_count, 0, 1000); - inc_new_counter_info!("bank-process_transactions-sigs", signature_count, 0, 1000); + inc_new_high_rate_counter_info!("bank-process_transactions-txs", tx_count); + inc_new_high_rate_counter_info!("bank-process_transactions-sigs", signature_count); if executed.iter().any(|res| Self::can_commit(res)) { self.is_delta.store(true, Ordering::Relaxed);