Rate limit transaction counters (#5447)
* Rate limit transaction counters * @sakridge feedback * Set default high metrics rate for multinode demo * Fix tests * Swap defaults and fix env var tests * Only set metrics rate if not already set
This commit is contained in:
parent
5b4f24eabd
commit
4f86c0b74a
|
@ -228,10 +228,13 @@ impl BankingStage {
|
|||
(new_tx_count as f32) / (proc_start.as_s())
|
||||
);
|
||||
|
||||
inc_new_counter_info!("banking_stage-rebuffered_packets", rebuffered_packets);
|
||||
inc_new_counter_info!("banking_stage-consumed_buffered_packets", new_tx_count);
|
||||
inc_new_counter_debug!("banking_stage-process_transactions", new_tx_count);
|
||||
inc_new_counter_debug!("banking_stage-dropped_batches_count", dropped_batches_count);
|
||||
inc_new_high_rate_counter_info!("banking_stage-rebuffered_packets", rebuffered_packets);
|
||||
inc_new_high_rate_counter_info!("banking_stage-consumed_buffered_packets", new_tx_count);
|
||||
inc_new_high_rate_counter_debug!("banking_stage-process_transactions", new_tx_count);
|
||||
inc_new_high_rate_counter_debug!(
|
||||
"banking_stage-dropped_batches_count",
|
||||
dropped_batches_count
|
||||
);
|
||||
|
||||
Ok(unprocessed_packets)
|
||||
}
|
||||
|
@ -810,7 +813,7 @@ impl BankingStage {
|
|||
count,
|
||||
id,
|
||||
);
|
||||
inc_new_counter_debug!("banking_stage-transactions_received", count);
|
||||
inc_new_high_rate_counter_debug!("banking_stage-transactions_received", count);
|
||||
let mut proc_start = Measure::start("process_received_packets_process");
|
||||
let mut new_tx_count = 0;
|
||||
|
||||
|
@ -882,9 +885,12 @@ impl BankingStage {
|
|||
count,
|
||||
id,
|
||||
);
|
||||
inc_new_counter_debug!("banking_stage-process_packets", count);
|
||||
inc_new_counter_debug!("banking_stage-process_transactions", new_tx_count);
|
||||
inc_new_counter_debug!("banking_stage-dropped_batches_count", dropped_batches_count);
|
||||
inc_new_high_rate_counter_debug!("banking_stage-process_packets", count);
|
||||
inc_new_high_rate_counter_debug!("banking_stage-process_transactions", new_tx_count);
|
||||
inc_new_high_rate_counter_debug!(
|
||||
"banking_stage-dropped_batches_count",
|
||||
dropped_batches_count
|
||||
);
|
||||
|
||||
*recv_start = Instant::now();
|
||||
|
||||
|
|
|
@ -1155,7 +1155,7 @@ impl ClusterInfo {
|
|||
stakes: &HashMap<Pubkey, u64>,
|
||||
) -> Vec<SharedBlob> {
|
||||
let self_id = me.read().unwrap().gossip.id;
|
||||
inc_new_counter_debug!("cluster_info-push_message", 1, 0, 1000);
|
||||
inc_new_high_rate_counter_debug!("cluster_info-push_message", 1);
|
||||
|
||||
let updated: Vec<_> =
|
||||
me.write()
|
||||
|
|
|
@ -296,11 +296,9 @@ impl PohRecorder {
|
|||
pub fn tick(&mut self) {
|
||||
let now = Instant::now();
|
||||
let poh_entry = self.poh.lock().unwrap().tick();
|
||||
inc_new_counter_warn!(
|
||||
inc_new_high_rate_counter_warn!(
|
||||
"poh_recorder-tick_lock_contention",
|
||||
timing::duration_as_ms(&now.elapsed()) as usize,
|
||||
0,
|
||||
1000
|
||||
timing::duration_as_ms(&now.elapsed()) as usize
|
||||
);
|
||||
let now = Instant::now();
|
||||
if let Some(poh_entry) = poh_entry {
|
||||
|
@ -308,11 +306,9 @@ impl PohRecorder {
|
|||
trace!("tick {}", self.tick_height);
|
||||
|
||||
if self.start_leader_at_tick.is_none() {
|
||||
inc_new_counter_warn!(
|
||||
inc_new_high_rate_counter_warn!(
|
||||
"poh_recorder-tick_overhead",
|
||||
timing::duration_as_ms(&now.elapsed()) as usize,
|
||||
0,
|
||||
1000
|
||||
timing::duration_as_ms(&now.elapsed()) as usize
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
@ -326,11 +322,9 @@ impl PohRecorder {
|
|||
self.tick_cache.push((entry, self.tick_height));
|
||||
let _ = self.flush_cache(true);
|
||||
}
|
||||
inc_new_counter_warn!(
|
||||
inc_new_high_rate_counter_warn!(
|
||||
"poh_recorder-tick_overhead",
|
||||
timing::duration_as_ms(&now.elapsed()) as usize,
|
||||
0,
|
||||
1000
|
||||
timing::duration_as_ms(&now.elapsed()) as usize
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -356,11 +350,9 @@ impl PohRecorder {
|
|||
|
||||
let now = Instant::now();
|
||||
if let Some(poh_entry) = self.poh.lock().unwrap().record(mixin) {
|
||||
inc_new_counter_warn!(
|
||||
inc_new_high_rate_counter_warn!(
|
||||
"poh_recorder-record_lock_contention",
|
||||
timing::duration_as_ms(&now.elapsed()) as usize,
|
||||
0,
|
||||
1000
|
||||
timing::duration_as_ms(&now.elapsed()) as usize
|
||||
);
|
||||
let entry = Entry {
|
||||
num_hashes: poh_entry.num_hashes,
|
||||
|
|
|
@ -556,7 +556,7 @@ impl ReplayStage {
|
|||
);
|
||||
}
|
||||
}
|
||||
inc_new_counter_info!("replay_stage-replay_transactions", tx_count);
|
||||
inc_new_high_rate_counter_info!("replay_stage-replay_transactions", tx_count);
|
||||
did_complete_bank
|
||||
}
|
||||
|
||||
|
|
|
@ -39,12 +39,7 @@ fn retransmit_blobs(blobs: &[SharedBlob], retransmit: &BlobSender, id: &Pubkey)
|
|||
}
|
||||
|
||||
if !retransmit_queue.is_empty() {
|
||||
inc_new_counter_debug!(
|
||||
"streamer-recv_window-retransmit",
|
||||
retransmit_queue.len(),
|
||||
0,
|
||||
1000
|
||||
);
|
||||
inc_new_high_rate_counter_debug!("streamer-recv_window-retransmit", retransmit_queue.len());
|
||||
retransmit.send(retransmit_queue)?;
|
||||
}
|
||||
Ok(())
|
||||
|
@ -118,7 +113,7 @@ where
|
|||
blobs.append(&mut blob)
|
||||
}
|
||||
let now = Instant::now();
|
||||
inc_new_counter_debug!("streamer-recv_window-recv", blobs.len(), 0, 1000);
|
||||
inc_new_high_rate_counter_debug!("streamer-recv_window-recv", blobs.len());
|
||||
|
||||
let blobs: Vec<_> = thread_pool.install(|| {
|
||||
blobs
|
||||
|
|
|
@ -6,6 +6,10 @@ use std::sync::atomic::{AtomicUsize, Ordering};
|
|||
|
||||
const DEFAULT_LOG_RATE: usize = 1000;
|
||||
const DEFAULT_METRICS_RATE: usize = 1;
|
||||
const DEFAULT_METRICS_HIGH_RATE: usize = 10;
|
||||
|
||||
/// Use default metrics high rate
|
||||
pub const HIGH_RATE: usize = 999_999;
|
||||
|
||||
pub struct Counter {
|
||||
pub name: &'static str,
|
||||
|
@ -125,7 +129,69 @@ macro_rules! inc_new_counter_debug {
|
|||
}};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! inc_new_high_rate_counter_error {
|
||||
($name:expr, $count:expr) => {{
|
||||
inc_new_counter!(
|
||||
$name,
|
||||
$count,
|
||||
log::Level::Error,
|
||||
0,
|
||||
$crate::counter::HIGH_RATE
|
||||
);
|
||||
}};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! inc_new_high_rate_counter_warn {
|
||||
($name:expr, $count:expr) => {{
|
||||
inc_new_counter!(
|
||||
$name,
|
||||
$count,
|
||||
log::Level::Warn,
|
||||
0,
|
||||
$crate::counter::HIGH_RATE
|
||||
);
|
||||
}};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! inc_new_high_rate_counter_info {
|
||||
($name:expr, $count:expr) => {{
|
||||
inc_new_counter!(
|
||||
$name,
|
||||
$count,
|
||||
log::Level::Info,
|
||||
0,
|
||||
$crate::counter::HIGH_RATE
|
||||
);
|
||||
}};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! inc_new_high_rate_counter_debug {
|
||||
($name:expr, $count:expr) => {{
|
||||
inc_new_counter!(
|
||||
$name,
|
||||
$count,
|
||||
log::Level::Debug,
|
||||
0,
|
||||
$crate::counter::HIGH_RATE
|
||||
);
|
||||
}};
|
||||
}
|
||||
|
||||
impl Counter {
|
||||
fn default_metrics_high_rate() -> usize {
|
||||
let v = env::var("SOLANA_METRICS_HIGH_RATE")
|
||||
.map(|x| x.parse().unwrap_or(0))
|
||||
.unwrap_or(0);
|
||||
if v == 0 {
|
||||
DEFAULT_METRICS_HIGH_RATE
|
||||
} else {
|
||||
v
|
||||
}
|
||||
}
|
||||
fn default_log_rate() -> usize {
|
||||
let v = env::var("SOLANA_DEFAULT_LOG_RATE")
|
||||
.map(|x| x.parse().unwrap_or(DEFAULT_LOG_RATE))
|
||||
|
@ -142,20 +208,22 @@ impl Counter {
|
|||
.add_field("count", influxdb::Value::Integer(0))
|
||||
.to_owned(),
|
||||
);
|
||||
self.lograte
|
||||
.compare_and_swap(0, Self::default_log_rate(), Ordering::Relaxed);
|
||||
self.metricsrate.compare_and_swap(
|
||||
HIGH_RATE,
|
||||
Self::default_metrics_high_rate(),
|
||||
Ordering::Relaxed,
|
||||
);
|
||||
self.metricsrate
|
||||
.compare_and_swap(0, DEFAULT_METRICS_RATE, Ordering::Relaxed);
|
||||
}
|
||||
pub fn inc(&mut self, level: log::Level, events: usize) {
|
||||
let counts = self.counts.fetch_add(events, Ordering::Relaxed);
|
||||
let times = self.times.fetch_add(1, Ordering::Relaxed);
|
||||
let mut lograte = self.lograte.load(Ordering::Relaxed);
|
||||
if lograte == 0 {
|
||||
lograte = Self::default_log_rate();
|
||||
self.lograte.store(lograte, Ordering::Relaxed);
|
||||
}
|
||||
let mut metricsrate = self.metricsrate.load(Ordering::Relaxed);
|
||||
if metricsrate == 0 {
|
||||
metricsrate = DEFAULT_METRICS_RATE;
|
||||
self.metricsrate.store(metricsrate, Ordering::Relaxed);
|
||||
}
|
||||
let lograte = self.lograte.load(Ordering::Relaxed);
|
||||
let metricsrate = self.metricsrate.load(Ordering::Relaxed);
|
||||
|
||||
if times % lograte == 0 && times > 0 && log_enabled!(level) {
|
||||
log!(level,
|
||||
"COUNTER:{{\"name\": \"{}\", \"counts\": {}, \"samples\": {}, \"now\": {}, \"events\": {}}}",
|
||||
|
@ -191,7 +259,7 @@ impl Counter {
|
|||
}
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::counter::{Counter, DEFAULT_LOG_RATE};
|
||||
use crate::counter::{Counter, DEFAULT_LOG_RATE, DEFAULT_METRICS_HIGH_RATE, HIGH_RATE};
|
||||
use log::Level;
|
||||
use log::*;
|
||||
use serial_test_derive::serial;
|
||||
|
@ -219,6 +287,9 @@ mod tests {
|
|||
.ok();
|
||||
let _readlock = get_env_lock().read();
|
||||
static mut COUNTER: Counter = create_counter!("test", 1000, 1);
|
||||
unsafe {
|
||||
COUNTER.init();
|
||||
}
|
||||
let count = 1;
|
||||
inc_counter!(COUNTER, Level::Info, count);
|
||||
unsafe {
|
||||
|
@ -239,6 +310,40 @@ mod tests {
|
|||
assert_eq!(COUNTER.lastlog.load(Ordering::Relaxed), 399);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_high_rate_counter() {
|
||||
env_logger::Builder::from_env(env_logger::Env::new().default_filter_or("solana=info"))
|
||||
.try_init()
|
||||
.ok();
|
||||
let _readlock = get_env_lock().read();
|
||||
static mut COUNTER: Counter = create_counter!("test", 1000, HIGH_RATE);
|
||||
env::remove_var("SOLANA_METRICS_HIGH_RATE");
|
||||
unsafe {
|
||||
COUNTER.init();
|
||||
assert_eq!(
|
||||
COUNTER.metricsrate.load(Ordering::Relaxed),
|
||||
DEFAULT_METRICS_HIGH_RATE
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_high_rate_counter_env() {
|
||||
env_logger::Builder::from_env(env_logger::Env::new().default_filter_or("solana=info"))
|
||||
.try_init()
|
||||
.ok();
|
||||
let _writelock = get_env_lock().write();
|
||||
static mut COUNTER: Counter = create_counter!("test", 1000, HIGH_RATE);
|
||||
env::set_var("SOLANA_METRICS_HIGH_RATE", "50");
|
||||
unsafe {
|
||||
COUNTER.init();
|
||||
assert_eq!(COUNTER.metricsrate.load(Ordering::Relaxed), 50);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_inc_new_counter() {
|
||||
|
@ -249,6 +354,7 @@ mod tests {
|
|||
inc_new_counter_info!("2", 1, 3);
|
||||
inc_new_counter_info!("3", 1, 2, 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_lograte() {
|
||||
|
@ -264,8 +370,8 @@ mod tests {
|
|||
DEFAULT_LOG_RATE,
|
||||
);
|
||||
static mut COUNTER: Counter = create_counter!("test_lograte", 0, 1);
|
||||
inc_counter!(COUNTER, Level::Error, 2);
|
||||
unsafe {
|
||||
COUNTER.init();
|
||||
assert_eq!(COUNTER.lograte.load(Ordering::Relaxed), DEFAULT_LOG_RATE);
|
||||
}
|
||||
}
|
||||
|
@ -280,15 +386,15 @@ mod tests {
|
|||
let _writelock = get_env_lock().write();
|
||||
static mut COUNTER: Counter = create_counter!("test_lograte_env", 0, 1);
|
||||
env::set_var("SOLANA_DEFAULT_LOG_RATE", "50");
|
||||
inc_counter!(COUNTER, Level::Error, 2);
|
||||
unsafe {
|
||||
COUNTER.init();
|
||||
assert_eq!(COUNTER.lograte.load(Ordering::Relaxed), 50);
|
||||
}
|
||||
|
||||
static mut COUNTER2: Counter = create_counter!("test_lograte_env", 0, 1);
|
||||
env::set_var("SOLANA_DEFAULT_LOG_RATE", "0");
|
||||
inc_counter!(COUNTER2, Level::Error, 2);
|
||||
unsafe {
|
||||
COUNTER2.init();
|
||||
assert_eq!(COUNTER2.lograte.load(Ordering::Relaxed), DEFAULT_LOG_RATE);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -60,6 +60,10 @@ solana_replicator=$(solana_program replicator)
|
|||
|
||||
export RUST_BACKTRACE=1
|
||||
|
||||
if [[ -z $SOLANA_METRICS_HIGH_RATE ]]; then
|
||||
export SOLANA_METRICS_HIGH_RATE=1000
|
||||
fi
|
||||
|
||||
# shellcheck source=scripts/configure-metrics.sh
|
||||
source "$SOLANA_ROOT"/scripts/configure-metrics.sh
|
||||
|
||||
|
|
|
@ -4,7 +4,7 @@ use crate::exchange_instruction::*;
|
|||
use crate::exchange_state::*;
|
||||
use crate::faucet;
|
||||
use log::*;
|
||||
use solana_metrics::inc_new_counter_info;
|
||||
use solana_metrics::inc_new_high_rate_counter_info;
|
||||
use solana_sdk::account::KeyedAccount;
|
||||
use solana_sdk::instruction::InstructionError;
|
||||
use solana_sdk::pubkey::Pubkey;
|
||||
|
@ -296,7 +296,7 @@ impl ExchangeProcessor {
|
|||
// Trade holds the tokens in escrow
|
||||
account.tokens[from_token] -= info.tokens;
|
||||
|
||||
inc_new_counter_info!("exchange_processor-trades", 1, 1000, 1000);
|
||||
inc_new_high_rate_counter_info!("exchange_processor-trades", 1);
|
||||
|
||||
Self::serialize(
|
||||
&ExchangeState::Trade(OrderInfo {
|
||||
|
@ -390,7 +390,7 @@ impl ExchangeProcessor {
|
|||
Err(e)?
|
||||
}
|
||||
|
||||
inc_new_counter_info!("exchange_processor-swaps", 1, 1000, 1000);
|
||||
inc_new_high_rate_counter_info!("exchange_processor-swaps", 1);
|
||||
|
||||
if to_order.tokens == 0 {
|
||||
// Turn into token account
|
||||
|
|
|
@ -25,7 +25,7 @@ use log::*;
|
|||
use serde::{Deserialize, Serialize};
|
||||
use solana_measure::measure::Measure;
|
||||
use solana_metrics::{
|
||||
datapoint_info, inc_new_counter_debug, inc_new_counter_error, inc_new_counter_info,
|
||||
datapoint_info, inc_new_counter_debug, inc_new_counter_error, inc_new_high_rate_counter_info,
|
||||
};
|
||||
use solana_sdk::account::Account;
|
||||
use solana_sdk::fee_calculator::FeeCalculator;
|
||||
|
@ -856,59 +856,45 @@ impl Bank {
|
|||
|
||||
fn update_error_counters(error_counters: &ErrorCounters) {
|
||||
if 0 != error_counters.blockhash_not_found {
|
||||
inc_new_counter_error!(
|
||||
inc_new_high_rate_counter_error!(
|
||||
"bank-process_transactions-error-blockhash_not_found",
|
||||
error_counters.blockhash_not_found,
|
||||
0,
|
||||
1000
|
||||
error_counters.blockhash_not_found
|
||||
);
|
||||
}
|
||||
if 0 != error_counters.invalid_account_index {
|
||||
inc_new_counter_error!(
|
||||
inc_new_high_rate_counter_error!(
|
||||
"bank-process_transactions-error-invalid_account_index",
|
||||
error_counters.invalid_account_index,
|
||||
0,
|
||||
1000
|
||||
error_counters.invalid_account_index
|
||||
);
|
||||
}
|
||||
if 0 != error_counters.reserve_blockhash {
|
||||
inc_new_counter_error!(
|
||||
inc_new_high_rate_counter_error!(
|
||||
"bank-process_transactions-error-reserve_blockhash",
|
||||
error_counters.reserve_blockhash,
|
||||
0,
|
||||
1000
|
||||
error_counters.reserve_blockhash
|
||||
);
|
||||
}
|
||||
if 0 != error_counters.duplicate_signature {
|
||||
inc_new_counter_error!(
|
||||
inc_new_high_rate_counter_error!(
|
||||
"bank-process_transactions-error-duplicate_signature",
|
||||
error_counters.duplicate_signature,
|
||||
0,
|
||||
1000
|
||||
error_counters.duplicate_signature
|
||||
);
|
||||
}
|
||||
if 0 != error_counters.invalid_account_for_fee {
|
||||
inc_new_counter_error!(
|
||||
inc_new_high_rate_counter_error!(
|
||||
"bank-process_transactions-error-invalid_account_for_fee",
|
||||
error_counters.invalid_account_for_fee,
|
||||
0,
|
||||
1000
|
||||
error_counters.invalid_account_for_fee
|
||||
);
|
||||
}
|
||||
if 0 != error_counters.insufficient_funds {
|
||||
inc_new_counter_error!(
|
||||
inc_new_high_rate_counter_error!(
|
||||
"bank-process_transactions-error-insufficient_funds",
|
||||
error_counters.insufficient_funds,
|
||||
0,
|
||||
1000
|
||||
error_counters.insufficient_funds
|
||||
);
|
||||
}
|
||||
if 0 != error_counters.account_loaded_twice {
|
||||
inc_new_counter_error!(
|
||||
inc_new_high_rate_counter_error!(
|
||||
"bank-process_transactions-account_loaded_twice",
|
||||
error_counters.account_loaded_twice,
|
||||
0,
|
||||
1000
|
||||
error_counters.account_loaded_twice
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -927,7 +913,7 @@ impl Bank {
|
|||
usize,
|
||||
) {
|
||||
debug!("processing transactions: {}", txs.len());
|
||||
inc_new_counter_info!("bank-process_transactions", txs.len());
|
||||
inc_new_high_rate_counter_info!("bank-process_transactions", txs.len());
|
||||
let mut error_counters = ErrorCounters::default();
|
||||
let mut load_time = Measure::start("accounts_load");
|
||||
|
||||
|
@ -988,13 +974,11 @@ impl Bank {
|
|||
}
|
||||
if err_count > 0 {
|
||||
debug!("{} errors of {} txs", err_count, err_count + tx_count);
|
||||
inc_new_counter_error!(
|
||||
inc_new_high_rate_counter_error!(
|
||||
"bank-process_transactions-account_not_found",
|
||||
error_counters.account_not_found,
|
||||
0,
|
||||
1000
|
||||
error_counters.account_not_found
|
||||
);
|
||||
inc_new_counter_error!("bank-process_transactions-error_count", err_count, 0, 1000);
|
||||
inc_new_counter_error!("bank-process_transactions-error_count", err_count);
|
||||
}
|
||||
|
||||
Self::update_error_counters(&error_counters);
|
||||
|
@ -1066,8 +1050,8 @@ impl Bank {
|
|||
self.increment_transaction_count(tx_count);
|
||||
self.increment_signature_count(signature_count);
|
||||
|
||||
inc_new_counter_info!("bank-process_transactions-txs", tx_count, 0, 1000);
|
||||
inc_new_counter_info!("bank-process_transactions-sigs", signature_count, 0, 1000);
|
||||
inc_new_high_rate_counter_info!("bank-process_transactions-txs", tx_count);
|
||||
inc_new_high_rate_counter_info!("bank-process_transactions-sigs", signature_count);
|
||||
|
||||
if executed.iter().any(|res| Self::can_commit(res)) {
|
||||
self.is_delta.store(true, Ordering::Relaxed);
|
||||
|
|
Loading…
Reference in New Issue