send-transaction-service metrics improvement (#24816)

Use data_point report metrics periodically -- every 5 seconds for better metrics correlation and reduce noisy metrics.
This commit is contained in:
Lijun Wang 2022-05-04 17:06:21 -07:00 committed by GitHub
parent 9bca909f63
commit 3852959ac8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 242 additions and 65 deletions

View File

@ -4,9 +4,12 @@ use {
log::*,
solana_client::connection_cache,
solana_measure::measure::Measure,
solana_metrics::{datapoint_warn, inc_new_counter_info},
solana_metrics::datapoint_warn,
solana_runtime::{bank::Bank, bank_forks::BankForks},
solana_sdk::{hash::Hash, nonce_account, pubkey::Pubkey, signature::Signature},
solana_sdk::{
hash::Hash, nonce_account, pubkey::Pubkey, saturating_add_assign, signature::Signature,
timing::AtomicInterval, transport::TransportError,
},
std::{
collections::{
hash_map::{Entry, HashMap},
@ -14,7 +17,7 @@ use {
},
net::SocketAddr,
sync::{
atomic::{AtomicBool, Ordering},
atomic::{AtomicBool, AtomicU64, Ordering},
Arc, Mutex, RwLock,
},
thread::{self, sleep, Builder, JoinHandle},
@ -181,6 +184,150 @@ where
}
}
/// Metrics of the send-transaction-service.
#[derive(Default)]
struct SendTransactionServiceStats {
/// Count of the received transactions
received_transactions: AtomicU64,
/// Count of the received duplicate transactions
received_duplicate_transactions: AtomicU64,
/// Count of transactions sent in batch
sent_transactions: AtomicU64,
/// Count of transactions not being added to retry queue
/// due to queue size limit
retry_queue_overflow: AtomicU64,
/// retry queue size
retry_queue_size: AtomicU64,
/// The count of calls of sending transactions which can be in batch or single.
send_attempt_count: AtomicU64,
/// Time spent on transactions in micro seconds
send_us: AtomicU64,
/// Send failure count
send_failure_count: AtomicU64,
/// Count of nonced transactions
nonced_transactions: AtomicU64,
/// Count of rooted transactions
rooted_transactions: AtomicU64,
/// Count of expired transactions
expired_transactions: AtomicU64,
/// Count of transactions exceeding max retries
transactions_exceeding_max_retries: AtomicU64,
/// Count of retries of transactions
retries: AtomicU64,
/// Count of transactions failed
failed_transactions: AtomicU64,
}
#[derive(Default)]
struct SendTransactionServiceStatsReport {
stats: SendTransactionServiceStats,
last_report: AtomicInterval,
}
impl SendTransactionServiceStatsReport {
/// report metrics of the send transaction service
fn report(&self) {
if self
.last_report
.should_update(SEND_TRANSACTION_METRICS_REPORT_RATE_MS)
{
datapoint_info!(
"send_transaction_service",
(
"recv-tx",
self.stats.received_transactions.swap(0, Ordering::Relaxed),
i64
),
(
"recv-duplicate",
self.stats
.received_duplicate_transactions
.swap(0, Ordering::Relaxed),
i64
),
(
"sent-tx",
self.stats.sent_transactions.swap(0, Ordering::Relaxed),
i64
),
(
"retry-queue-overflow",
self.stats.retry_queue_overflow.swap(0, Ordering::Relaxed),
i64
),
(
"retry-queue-size",
self.stats.retry_queue_size.swap(0, Ordering::Relaxed),
i64
),
(
"send-us",
self.stats.send_us.swap(0, Ordering::Relaxed),
i64
),
(
"send-attempt-count",
self.stats.send_attempt_count.swap(0, Ordering::Relaxed),
i64
),
(
"send-failure-count",
self.stats.send_failure_count.swap(0, Ordering::Relaxed),
i64
),
(
"nonced-tx",
self.stats.nonced_transactions.swap(0, Ordering::Relaxed),
i64
),
(
"rooted-tx",
self.stats.rooted_transactions.swap(0, Ordering::Relaxed),
i64
),
(
"expired-tx",
self.stats.expired_transactions.swap(0, Ordering::Relaxed),
i64
),
(
"max-retries-exceeded-tx",
self.stats
.transactions_exceeding_max_retries
.swap(0, Ordering::Relaxed),
i64
),
(
"retries",
self.stats.retries.swap(0, Ordering::Relaxed),
i64
),
(
"failed-tx",
self.stats.failed_transactions.swap(0, Ordering::Relaxed),
i64
)
);
}
}
}
/// Report the send transaction memtrics for every 5 seconds.
const SEND_TRANSACTION_METRICS_REPORT_RATE_MS: u64 = 5000;
impl SendTransactionService {
pub fn new<T: TpuInfo + std::marker::Send + 'static>(
tpu_address: SocketAddr,
@ -207,6 +354,8 @@ impl SendTransactionService {
receiver: Receiver<TransactionInfo>,
config: Config,
) -> Self {
let stats_report = Arc::new(SendTransactionServiceStatsReport::default());
let retry_transactions = Arc::new(Mutex::new(HashMap::new()));
let leader_info_provider = Arc::new(Mutex::new(CurrentLeaderInfo::new(leader_info)));
@ -218,6 +367,7 @@ impl SendTransactionService {
leader_info_provider.clone(),
config.clone(),
retry_transactions.clone(),
stats_report.clone(),
exit.clone(),
);
@ -227,6 +377,7 @@ impl SendTransactionService {
leader_info_provider,
config,
retry_transactions,
stats_report,
exit.clone(),
);
Self {
@ -243,6 +394,7 @@ impl SendTransactionService {
leader_info_provider: Arc<Mutex<CurrentLeaderInfo<T>>>,
config: Config,
retry_transactions: Arc<Mutex<HashMap<Signature, TransactionInfo>>>,
stats_report: Arc<SendTransactionServiceStatsReport>,
exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
let mut last_batch_sent = Instant::now();
@ -257,6 +409,7 @@ impl SendTransactionService {
.name("send-tx-receive".to_string())
.spawn(move || loop {
let recv_timeout_ms = config.batch_send_rate_ms;
let stats = &stats_report.stats;
match receiver.recv_timeout(Duration::from_millis(recv_timeout_ms)) {
Err(RecvTimeoutError::Disconnected) => {
info!("Terminating send-transaction-service.");
@ -265,7 +418,7 @@ impl SendTransactionService {
}
Err(RecvTimeoutError::Timeout) => {}
Ok(transaction_info) => {
inc_new_counter_info!("send_transaction_service-recv-tx", 1);
stats.received_transactions.fetch_add(1, Ordering::Relaxed);
let entry = transactions.entry(transaction_info.signature);
let mut new_transaction = false;
if let Entry::Vacant(_) = entry {
@ -279,7 +432,9 @@ impl SendTransactionService {
}
}
if !new_transaction {
inc_new_counter_info!("send_transaction_service-recv-duplicate", 1);
stats
.received_duplicate_transactions
.fetch_add(1, Ordering::Relaxed);
}
}
}
@ -288,20 +443,22 @@ impl SendTransactionService {
&& last_batch_sent.elapsed().as_millis() as u64 >= config.batch_send_rate_ms)
|| transactions.len() >= config.batch_size
{
inc_new_counter_info!(
"send_transaction_service-batch-size",
transactions.len()
);
stats
.sent_transactions
.fetch_add(transactions.len() as u64, Ordering::Relaxed);
let _result = Self::send_transactions_in_batch(
&tpu_address,
&mut transactions,
leader_info_provider.lock().unwrap().get_leader_info(),
&config,
stats,
);
let last_sent_time = Instant::now();
{
// take a lock of retry_transactions and move the batch to the retry set.
let mut retry_transactions = retry_transactions.lock().unwrap();
let transactions_to_retry = transactions.len();
let mut transactions_added_to_retry: usize = 0;
for (signature, mut transaction_info) in transactions.drain() {
let retry_len = retry_transactions.len();
let entry = retry_transactions.entry(signature);
@ -311,14 +468,23 @@ impl SendTransactionService {
break;
} else {
transaction_info.last_sent_time = Some(last_sent_time);
saturating_add_assign!(transactions_added_to_retry, 1);
entry.or_insert(transaction_info);
}
}
}
stats.retry_queue_overflow.fetch_add(
transactions_to_retry.saturating_sub(transactions_added_to_retry)
as u64,
Ordering::Relaxed,
);
stats
.retry_queue_size
.store(retry_transactions.len() as u64, Ordering::Relaxed);
}
last_batch_sent = Instant::now();
}
stats_report.report();
})
.unwrap()
}
@ -330,6 +496,7 @@ impl SendTransactionService {
leader_info_provider: Arc<Mutex<CurrentLeaderInfo<T>>>,
config: Config,
retry_transactions: Arc<Mutex<HashMap<Signature, TransactionInfo>>>,
stats_report: Arc<SendTransactionServiceStatsReport>,
exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
info!(
@ -341,6 +508,7 @@ impl SendTransactionService {
.name("send-tx-retry".to_string())
.spawn(move || loop {
let retry_interval_ms = config.retry_rate_ms;
let stats = &stats_report.stats;
sleep(Duration::from_millis(
MAX_RETRY_SLEEP_MS.min(retry_interval_ms),
));
@ -349,10 +517,9 @@ impl SendTransactionService {
}
let mut transactions = retry_transactions.lock().unwrap();
if !transactions.is_empty() {
datapoint_info!(
"send_transaction_service-queue-size",
("len", transactions.len(), i64)
);
stats
.retry_queue_size
.store(transactions.len() as u64, Ordering::Relaxed);
let (root_bank, working_bank) = {
let bank_forks = bank_forks.read().unwrap();
(
@ -368,7 +535,9 @@ impl SendTransactionService {
&mut transactions,
&leader_info_provider,
&config,
stats,
);
stats_report.report();
}
})
.unwrap()
@ -380,9 +549,8 @@ impl SendTransactionService {
transactions: &mut HashMap<Signature, TransactionInfo>,
leader_info: Option<&T>,
config: &Config,
stats: &SendTransactionServiceStats,
) {
let mut measure = Measure::start("send_transactions_in_batch-us");
// Processing the transactions in batch
let addresses = Self::get_tpu_addresses(tpu_address, leader_info, config);
@ -392,15 +560,8 @@ impl SendTransactionService {
.collect::<Vec<&[u8]>>();
for address in &addresses {
Self::send_transactions(address, &wire_transactions);
Self::send_transactions(address, &wire_transactions, stats);
}
measure.stop();
inc_new_counter_info!(
"send_transactions_in_batch-us",
measure.as_us() as usize,
1000,
1000
);
}
/// Retry transactions sent before.
@ -411,6 +572,7 @@ impl SendTransactionService {
transactions: &mut HashMap<Signature, TransactionInfo>,
leader_info_provider: &Arc<Mutex<CurrentLeaderInfo<T>>>,
config: &Config,
stats: &SendTransactionServiceStats,
) -> ProcessTransactionsResult {
let mut result = ProcessTransactionsResult::default();
@ -419,12 +581,12 @@ impl SendTransactionService {
transactions.retain(|signature, mut transaction_info| {
if transaction_info.durable_nonce_info.is_some() {
inc_new_counter_info!("send_transaction_service-nonced", 1);
stats.nonced_transactions.fetch_add(1, Ordering::Relaxed);
}
if root_bank.has_signature(signature) {
info!("Transaction is rooted: {}", signature);
result.rooted += 1;
inc_new_counter_info!("send_transaction_service-rooted", 1);
stats.rooted_transactions.fetch_add(1, Ordering::Relaxed);
return false;
}
let signature_status = working_bank.get_signature_status_slot(signature);
@ -441,14 +603,14 @@ impl SendTransactionService {
{
info!("Dropping expired durable-nonce transaction: {}", signature);
result.expired += 1;
inc_new_counter_info!("send_transaction_service-expired", 1);
stats.expired_transactions.fetch_add(1, Ordering::Relaxed);
return false;
}
}
if transaction_info.last_valid_block_height < root_bank.block_height() {
info!("Dropping expired transaction: {}", signature);
result.expired += 1;
inc_new_counter_info!("send_transaction_service-expired", 1);
stats.expired_transactions.fetch_add(1, Ordering::Relaxed);
return false;
}
@ -461,7 +623,9 @@ impl SendTransactionService {
if transaction_info.retries >= max_retries {
info!("Dropping transaction due to max retries: {}", signature);
result.max_retries_elapsed += 1;
inc_new_counter_info!("send_transaction_service-max_retries", 1);
stats
.transactions_exceeding_max_retries
.fetch_add(1, Ordering::Relaxed);
return false;
}
}
@ -481,8 +645,7 @@ impl SendTransactionService {
info!("Retrying transaction: {}", signature);
result.retried += 1;
transaction_info.retries += 1;
inc_new_counter_info!("send_transaction_service-retry", 1);
stats.retries.fetch_add(1, Ordering::Relaxed);
}
batched_transactions.insert(*signature);
@ -494,7 +657,7 @@ impl SendTransactionService {
if status.is_err() {
info!("Dropping failed transaction: {}", signature);
result.failed += 1;
inc_new_counter_info!("send_transaction_service-failed", 1);
stats.failed_transactions.fetch_add(1, Ordering::Relaxed);
false
} else {
result.retained += 1;
@ -519,54 +682,51 @@ impl SendTransactionService {
let addresses = Self::get_tpu_addresses(tpu_address, leader_info, config);
for address in &addresses {
Self::send_transactions(address, chunk);
Self::send_transactions(address, chunk, stats);
}
}
}
result
}
fn send_transaction(tpu_address: &SocketAddr, wire_transaction: &[u8]) {
let mut measure = Measure::start("send_transaction_service-us");
if let Err(err) =
connection_cache::send_wire_transaction_async(wire_transaction.to_vec(), tpu_address)
{
warn!("Failed to send transaction to {}: {:?}", tpu_address, err);
}
measure.stop();
inc_new_counter_info!(
"send_transaction_service-us",
measure.as_us() as usize,
1000,
1000
);
fn send_transaction(
tpu_address: &SocketAddr,
wire_transaction: &[u8],
) -> Result<(), TransportError> {
connection_cache::send_wire_transaction_async(wire_transaction.to_vec(), tpu_address)
}
fn send_transactions_with_metrics(tpu_address: &SocketAddr, wire_transactions: &[&[u8]]) {
let mut measure = Measure::start("send_transaction_service-batch-us");
fn send_transactions_with_metrics(
tpu_address: &SocketAddr,
wire_transactions: &[&[u8]],
) -> Result<(), TransportError> {
let wire_transactions = wire_transactions.iter().map(|t| t.to_vec()).collect();
let send_result =
connection_cache::send_wire_transaction_batch_async(wire_transactions, tpu_address);
if let Err(err) = send_result {
warn!(
"Failed to send transaction batch to {}: {:?}",
tpu_address, err
);
}
measure.stop();
inc_new_counter_info!(
"send_transaction_service-batch-us",
measure.as_us() as usize
);
connection_cache::send_wire_transaction_batch_async(wire_transactions, tpu_address)
}
fn send_transactions(tpu_address: &SocketAddr, wire_transactions: &[&[u8]]) {
if wire_transactions.len() == 1 {
fn send_transactions(
tpu_address: &SocketAddr,
wire_transactions: &[&[u8]],
stats: &SendTransactionServiceStats,
) {
let mut measure = Measure::start("send-us");
let result = if wire_transactions.len() == 1 {
Self::send_transaction(tpu_address, wire_transactions[0])
} else {
Self::send_transactions_with_metrics(tpu_address, wire_transactions)
};
if let Err(err) = result {
warn!(
"Failed to send transaction transaction to {}: {:?}",
tpu_address, err
);
stats.send_failure_count.fetch_add(1, Ordering::Relaxed);
}
measure.stop();
stats.send_us.fetch_add(measure.as_us(), Ordering::Relaxed);
stats.send_attempt_count.fetch_add(1, Ordering::Relaxed);
}
fn get_tpu_addresses<'a, T: TpuInfo>(
@ -670,6 +830,7 @@ mod test {
info!("Expired transactions are dropped...");
let leader_info_provider = Arc::new(Mutex::new(CurrentLeaderInfo::new(None)));
let stats = SendTransactionServiceStats::default();
transactions.insert(
Signature::default(),
TransactionInfo::new(
@ -688,6 +849,7 @@ mod test {
&mut transactions,
&leader_info_provider,
&config,
&stats,
);
assert!(transactions.is_empty());
assert_eq!(
@ -717,6 +879,7 @@ mod test {
&mut transactions,
&leader_info_provider,
&config,
&stats,
);
assert!(transactions.is_empty());
assert_eq!(
@ -746,6 +909,7 @@ mod test {
&mut transactions,
&leader_info_provider,
&config,
&stats,
);
assert!(transactions.is_empty());
assert_eq!(
@ -775,6 +939,7 @@ mod test {
&mut transactions,
&leader_info_provider,
&config,
&stats,
);
assert_eq!(transactions.len(), 1);
assert_eq!(
@ -806,6 +971,7 @@ mod test {
&mut transactions,
&leader_info_provider,
&config,
&stats,
);
assert_eq!(transactions.len(), 1);
assert_eq!(
@ -847,6 +1013,7 @@ mod test {
&mut transactions,
&leader_info_provider,
&config,
&stats,
);
assert_eq!(transactions.len(), 1);
assert_eq!(
@ -864,6 +1031,7 @@ mod test {
&mut transactions,
&leader_info_provider,
&config,
&stats,
);
assert!(transactions.is_empty());
assert_eq!(
@ -937,6 +1105,7 @@ mod test {
),
);
let leader_info_provider = Arc::new(Mutex::new(CurrentLeaderInfo::new(None)));
let stats = SendTransactionServiceStats::default();
let result = SendTransactionService::process_transactions::<NullTpuInfo>(
&working_bank,
&root_bank,
@ -944,6 +1113,7 @@ mod test {
&mut transactions,
&leader_info_provider,
&config,
&stats,
);
assert!(transactions.is_empty());
assert_eq!(
@ -972,6 +1142,7 @@ mod test {
&mut transactions,
&leader_info_provider,
&config,
&stats,
);
assert!(transactions.is_empty());
assert_eq!(
@ -1002,6 +1173,7 @@ mod test {
&mut transactions,
&leader_info_provider,
&config,
&stats,
);
assert!(transactions.is_empty());
assert_eq!(
@ -1030,6 +1202,7 @@ mod test {
&mut transactions,
&leader_info_provider,
&config,
&stats,
);
assert!(transactions.is_empty());
assert_eq!(
@ -1059,6 +1232,7 @@ mod test {
&mut transactions,
&leader_info_provider,
&config,
&stats,
);
assert!(transactions.is_empty());
assert_eq!(
@ -1088,6 +1262,7 @@ mod test {
&mut transactions,
&leader_info_provider,
&config,
&stats,
);
assert_eq!(transactions.len(), 1);
assert_eq!(
@ -1119,6 +1294,7 @@ mod test {
&mut transactions,
&leader_info_provider,
&config,
&stats,
);
assert_eq!(transactions.len(), 1);
assert_eq!(
@ -1147,6 +1323,7 @@ mod test {
&mut transactions,
&leader_info_provider,
&config,
&stats,
);
assert_eq!(transactions.len(), 0);
assert_eq!(