diff --git a/send-transaction-service/src/send_transaction_service.rs b/send-transaction-service/src/send_transaction_service.rs index 43c9793ea4..5596f51fcd 100644 --- a/send-transaction-service/src/send_transaction_service.rs +++ b/send-transaction-service/src/send_transaction_service.rs @@ -4,9 +4,12 @@ use { log::*, solana_client::connection_cache, solana_measure::measure::Measure, - solana_metrics::{datapoint_warn, inc_new_counter_info}, + solana_metrics::datapoint_warn, solana_runtime::{bank::Bank, bank_forks::BankForks}, - solana_sdk::{hash::Hash, nonce_account, pubkey::Pubkey, signature::Signature}, + solana_sdk::{ + hash::Hash, nonce_account, pubkey::Pubkey, saturating_add_assign, signature::Signature, + timing::AtomicInterval, transport::TransportError, + }, std::{ collections::{ hash_map::{Entry, HashMap}, @@ -14,7 +17,7 @@ use { }, net::SocketAddr, sync::{ - atomic::{AtomicBool, Ordering}, + atomic::{AtomicBool, AtomicU64, Ordering}, Arc, Mutex, RwLock, }, thread::{self, sleep, Builder, JoinHandle}, @@ -181,6 +184,150 @@ where } } +/// Metrics of the send-transaction-service. +#[derive(Default)] +struct SendTransactionServiceStats { + /// Count of the received transactions + received_transactions: AtomicU64, + + /// Count of the received duplicate transactions + received_duplicate_transactions: AtomicU64, + + /// Count of transactions sent in batch + sent_transactions: AtomicU64, + + /// Count of transactions not being added to retry queue + /// due to queue size limit + retry_queue_overflow: AtomicU64, + + /// retry queue size + retry_queue_size: AtomicU64, + + /// The count of calls of sending transactions which can be in batch or single. + send_attempt_count: AtomicU64, + + /// Time spent on transactions in micro seconds + send_us: AtomicU64, + + /// Send failure count + send_failure_count: AtomicU64, + + /// Count of nonced transactions + nonced_transactions: AtomicU64, + + /// Count of rooted transactions + rooted_transactions: AtomicU64, + + /// Count of expired transactions + expired_transactions: AtomicU64, + + /// Count of transactions exceeding max retries + transactions_exceeding_max_retries: AtomicU64, + + /// Count of retries of transactions + retries: AtomicU64, + + /// Count of transactions failed + failed_transactions: AtomicU64, +} + +#[derive(Default)] +struct SendTransactionServiceStatsReport { + stats: SendTransactionServiceStats, + last_report: AtomicInterval, +} + +impl SendTransactionServiceStatsReport { + /// report metrics of the send transaction service + fn report(&self) { + if self + .last_report + .should_update(SEND_TRANSACTION_METRICS_REPORT_RATE_MS) + { + datapoint_info!( + "send_transaction_service", + ( + "recv-tx", + self.stats.received_transactions.swap(0, Ordering::Relaxed), + i64 + ), + ( + "recv-duplicate", + self.stats + .received_duplicate_transactions + .swap(0, Ordering::Relaxed), + i64 + ), + ( + "sent-tx", + self.stats.sent_transactions.swap(0, Ordering::Relaxed), + i64 + ), + ( + "retry-queue-overflow", + self.stats.retry_queue_overflow.swap(0, Ordering::Relaxed), + i64 + ), + ( + "retry-queue-size", + self.stats.retry_queue_size.swap(0, Ordering::Relaxed), + i64 + ), + ( + "send-us", + self.stats.send_us.swap(0, Ordering::Relaxed), + i64 + ), + ( + "send-attempt-count", + self.stats.send_attempt_count.swap(0, Ordering::Relaxed), + i64 + ), + ( + "send-failure-count", + self.stats.send_failure_count.swap(0, Ordering::Relaxed), + i64 + ), + ( + "nonced-tx", + self.stats.nonced_transactions.swap(0, Ordering::Relaxed), + i64 + ), + ( + "rooted-tx", + self.stats.rooted_transactions.swap(0, Ordering::Relaxed), + i64 + ), + ( + "expired-tx", + self.stats.expired_transactions.swap(0, Ordering::Relaxed), + i64 + ), + ( + "max-retries-exceeded-tx", + self.stats + .transactions_exceeding_max_retries + .swap(0, Ordering::Relaxed), + i64 + ), + ( + "retries", + self.stats.retries.swap(0, Ordering::Relaxed), + i64 + ), + ( + "failed-tx", + self.stats.failed_transactions.swap(0, Ordering::Relaxed), + i64 + ) + ); + } + } +} + +/// Report the send transaction memtrics for every 5 seconds. +const SEND_TRANSACTION_METRICS_REPORT_RATE_MS: u64 = 5000; + impl SendTransactionService { pub fn new( tpu_address: SocketAddr, @@ -207,6 +354,8 @@ impl SendTransactionService { receiver: Receiver, config: Config, ) -> Self { + let stats_report = Arc::new(SendTransactionServiceStatsReport::default()); + let retry_transactions = Arc::new(Mutex::new(HashMap::new())); let leader_info_provider = Arc::new(Mutex::new(CurrentLeaderInfo::new(leader_info))); @@ -218,6 +367,7 @@ impl SendTransactionService { leader_info_provider.clone(), config.clone(), retry_transactions.clone(), + stats_report.clone(), exit.clone(), ); @@ -227,6 +377,7 @@ impl SendTransactionService { leader_info_provider, config, retry_transactions, + stats_report, exit.clone(), ); Self { @@ -243,6 +394,7 @@ impl SendTransactionService { leader_info_provider: Arc>>, config: Config, retry_transactions: Arc>>, + stats_report: Arc, exit: Arc, ) -> JoinHandle<()> { let mut last_batch_sent = Instant::now(); @@ -257,6 +409,7 @@ impl SendTransactionService { .name("send-tx-receive".to_string()) .spawn(move || loop { let recv_timeout_ms = config.batch_send_rate_ms; + let stats = &stats_report.stats; match receiver.recv_timeout(Duration::from_millis(recv_timeout_ms)) { Err(RecvTimeoutError::Disconnected) => { info!("Terminating send-transaction-service."); @@ -265,7 +418,7 @@ impl SendTransactionService { } Err(RecvTimeoutError::Timeout) => {} Ok(transaction_info) => { - inc_new_counter_info!("send_transaction_service-recv-tx", 1); + stats.received_transactions.fetch_add(1, Ordering::Relaxed); let entry = transactions.entry(transaction_info.signature); let mut new_transaction = false; if let Entry::Vacant(_) = entry { @@ -279,7 +432,9 @@ impl SendTransactionService { } } if !new_transaction { - inc_new_counter_info!("send_transaction_service-recv-duplicate", 1); + stats + .received_duplicate_transactions + .fetch_add(1, Ordering::Relaxed); } } } @@ -288,20 +443,22 @@ impl SendTransactionService { && last_batch_sent.elapsed().as_millis() as u64 >= config.batch_send_rate_ms) || transactions.len() >= config.batch_size { - inc_new_counter_info!( - "send_transaction_service-batch-size", - transactions.len() - ); + stats + .sent_transactions + .fetch_add(transactions.len() as u64, Ordering::Relaxed); let _result = Self::send_transactions_in_batch( &tpu_address, &mut transactions, leader_info_provider.lock().unwrap().get_leader_info(), &config, + stats, ); let last_sent_time = Instant::now(); { // take a lock of retry_transactions and move the batch to the retry set. let mut retry_transactions = retry_transactions.lock().unwrap(); + let transactions_to_retry = transactions.len(); + let mut transactions_added_to_retry: usize = 0; for (signature, mut transaction_info) in transactions.drain() { let retry_len = retry_transactions.len(); let entry = retry_transactions.entry(signature); @@ -311,14 +468,23 @@ impl SendTransactionService { break; } else { transaction_info.last_sent_time = Some(last_sent_time); + saturating_add_assign!(transactions_added_to_retry, 1); entry.or_insert(transaction_info); } } } + stats.retry_queue_overflow.fetch_add( + transactions_to_retry.saturating_sub(transactions_added_to_retry) + as u64, + Ordering::Relaxed, + ); + stats + .retry_queue_size + .store(retry_transactions.len() as u64, Ordering::Relaxed); } - last_batch_sent = Instant::now(); } + stats_report.report(); }) .unwrap() } @@ -330,6 +496,7 @@ impl SendTransactionService { leader_info_provider: Arc>>, config: Config, retry_transactions: Arc>>, + stats_report: Arc, exit: Arc, ) -> JoinHandle<()> { info!( @@ -341,6 +508,7 @@ impl SendTransactionService { .name("send-tx-retry".to_string()) .spawn(move || loop { let retry_interval_ms = config.retry_rate_ms; + let stats = &stats_report.stats; sleep(Duration::from_millis( MAX_RETRY_SLEEP_MS.min(retry_interval_ms), )); @@ -349,10 +517,9 @@ impl SendTransactionService { } let mut transactions = retry_transactions.lock().unwrap(); if !transactions.is_empty() { - datapoint_info!( - "send_transaction_service-queue-size", - ("len", transactions.len(), i64) - ); + stats + .retry_queue_size + .store(transactions.len() as u64, Ordering::Relaxed); let (root_bank, working_bank) = { let bank_forks = bank_forks.read().unwrap(); ( @@ -368,7 +535,9 @@ impl SendTransactionService { &mut transactions, &leader_info_provider, &config, + stats, ); + stats_report.report(); } }) .unwrap() @@ -380,9 +549,8 @@ impl SendTransactionService { transactions: &mut HashMap, leader_info: Option<&T>, config: &Config, + stats: &SendTransactionServiceStats, ) { - let mut measure = Measure::start("send_transactions_in_batch-us"); - // Processing the transactions in batch let addresses = Self::get_tpu_addresses(tpu_address, leader_info, config); @@ -392,15 +560,8 @@ impl SendTransactionService { .collect::>(); for address in &addresses { - Self::send_transactions(address, &wire_transactions); + Self::send_transactions(address, &wire_transactions, stats); } - measure.stop(); - inc_new_counter_info!( - "send_transactions_in_batch-us", - measure.as_us() as usize, - 1000, - 1000 - ); } /// Retry transactions sent before. @@ -411,6 +572,7 @@ impl SendTransactionService { transactions: &mut HashMap, leader_info_provider: &Arc>>, config: &Config, + stats: &SendTransactionServiceStats, ) -> ProcessTransactionsResult { let mut result = ProcessTransactionsResult::default(); @@ -419,12 +581,12 @@ impl SendTransactionService { transactions.retain(|signature, mut transaction_info| { if transaction_info.durable_nonce_info.is_some() { - inc_new_counter_info!("send_transaction_service-nonced", 1); + stats.nonced_transactions.fetch_add(1, Ordering::Relaxed); } if root_bank.has_signature(signature) { info!("Transaction is rooted: {}", signature); result.rooted += 1; - inc_new_counter_info!("send_transaction_service-rooted", 1); + stats.rooted_transactions.fetch_add(1, Ordering::Relaxed); return false; } let signature_status = working_bank.get_signature_status_slot(signature); @@ -441,14 +603,14 @@ impl SendTransactionService { { info!("Dropping expired durable-nonce transaction: {}", signature); result.expired += 1; - inc_new_counter_info!("send_transaction_service-expired", 1); + stats.expired_transactions.fetch_add(1, Ordering::Relaxed); return false; } } if transaction_info.last_valid_block_height < root_bank.block_height() { info!("Dropping expired transaction: {}", signature); result.expired += 1; - inc_new_counter_info!("send_transaction_service-expired", 1); + stats.expired_transactions.fetch_add(1, Ordering::Relaxed); return false; } @@ -461,7 +623,9 @@ impl SendTransactionService { if transaction_info.retries >= max_retries { info!("Dropping transaction due to max retries: {}", signature); result.max_retries_elapsed += 1; - inc_new_counter_info!("send_transaction_service-max_retries", 1); + stats + .transactions_exceeding_max_retries + .fetch_add(1, Ordering::Relaxed); return false; } } @@ -481,8 +645,7 @@ impl SendTransactionService { info!("Retrying transaction: {}", signature); result.retried += 1; transaction_info.retries += 1; - - inc_new_counter_info!("send_transaction_service-retry", 1); + stats.retries.fetch_add(1, Ordering::Relaxed); } batched_transactions.insert(*signature); @@ -494,7 +657,7 @@ impl SendTransactionService { if status.is_err() { info!("Dropping failed transaction: {}", signature); result.failed += 1; - inc_new_counter_info!("send_transaction_service-failed", 1); + stats.failed_transactions.fetch_add(1, Ordering::Relaxed); false } else { result.retained += 1; @@ -519,54 +682,51 @@ impl SendTransactionService { let addresses = Self::get_tpu_addresses(tpu_address, leader_info, config); for address in &addresses { - Self::send_transactions(address, chunk); + Self::send_transactions(address, chunk, stats); } } } result } - fn send_transaction(tpu_address: &SocketAddr, wire_transaction: &[u8]) { - let mut measure = Measure::start("send_transaction_service-us"); - if let Err(err) = - connection_cache::send_wire_transaction_async(wire_transaction.to_vec(), tpu_address) - { - warn!("Failed to send transaction to {}: {:?}", tpu_address, err); - } - measure.stop(); - inc_new_counter_info!( - "send_transaction_service-us", - measure.as_us() as usize, - 1000, - 1000 - ); + fn send_transaction( + tpu_address: &SocketAddr, + wire_transaction: &[u8], + ) -> Result<(), TransportError> { + connection_cache::send_wire_transaction_async(wire_transaction.to_vec(), tpu_address) } - fn send_transactions_with_metrics(tpu_address: &SocketAddr, wire_transactions: &[&[u8]]) { - let mut measure = Measure::start("send_transaction_service-batch-us"); - + fn send_transactions_with_metrics( + tpu_address: &SocketAddr, + wire_transactions: &[&[u8]], + ) -> Result<(), TransportError> { let wire_transactions = wire_transactions.iter().map(|t| t.to_vec()).collect(); - let send_result = - connection_cache::send_wire_transaction_batch_async(wire_transactions, tpu_address); - if let Err(err) = send_result { - warn!( - "Failed to send transaction batch to {}: {:?}", - tpu_address, err - ); - } - measure.stop(); - inc_new_counter_info!( - "send_transaction_service-batch-us", - measure.as_us() as usize - ); + connection_cache::send_wire_transaction_batch_async(wire_transactions, tpu_address) } - fn send_transactions(tpu_address: &SocketAddr, wire_transactions: &[&[u8]]) { - if wire_transactions.len() == 1 { + fn send_transactions( + tpu_address: &SocketAddr, + wire_transactions: &[&[u8]], + stats: &SendTransactionServiceStats, + ) { + let mut measure = Measure::start("send-us"); + let result = if wire_transactions.len() == 1 { Self::send_transaction(tpu_address, wire_transactions[0]) } else { Self::send_transactions_with_metrics(tpu_address, wire_transactions) + }; + + if let Err(err) = result { + warn!( + "Failed to send transaction transaction to {}: {:?}", + tpu_address, err + ); + stats.send_failure_count.fetch_add(1, Ordering::Relaxed); } + + measure.stop(); + stats.send_us.fetch_add(measure.as_us(), Ordering::Relaxed); + stats.send_attempt_count.fetch_add(1, Ordering::Relaxed); } fn get_tpu_addresses<'a, T: TpuInfo>( @@ -670,6 +830,7 @@ mod test { info!("Expired transactions are dropped..."); let leader_info_provider = Arc::new(Mutex::new(CurrentLeaderInfo::new(None))); + let stats = SendTransactionServiceStats::default(); transactions.insert( Signature::default(), TransactionInfo::new( @@ -688,6 +849,7 @@ mod test { &mut transactions, &leader_info_provider, &config, + &stats, ); assert!(transactions.is_empty()); assert_eq!( @@ -717,6 +879,7 @@ mod test { &mut transactions, &leader_info_provider, &config, + &stats, ); assert!(transactions.is_empty()); assert_eq!( @@ -746,6 +909,7 @@ mod test { &mut transactions, &leader_info_provider, &config, + &stats, ); assert!(transactions.is_empty()); assert_eq!( @@ -775,6 +939,7 @@ mod test { &mut transactions, &leader_info_provider, &config, + &stats, ); assert_eq!(transactions.len(), 1); assert_eq!( @@ -806,6 +971,7 @@ mod test { &mut transactions, &leader_info_provider, &config, + &stats, ); assert_eq!(transactions.len(), 1); assert_eq!( @@ -847,6 +1013,7 @@ mod test { &mut transactions, &leader_info_provider, &config, + &stats, ); assert_eq!(transactions.len(), 1); assert_eq!( @@ -864,6 +1031,7 @@ mod test { &mut transactions, &leader_info_provider, &config, + &stats, ); assert!(transactions.is_empty()); assert_eq!( @@ -937,6 +1105,7 @@ mod test { ), ); let leader_info_provider = Arc::new(Mutex::new(CurrentLeaderInfo::new(None))); + let stats = SendTransactionServiceStats::default(); let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, @@ -944,6 +1113,7 @@ mod test { &mut transactions, &leader_info_provider, &config, + &stats, ); assert!(transactions.is_empty()); assert_eq!( @@ -972,6 +1142,7 @@ mod test { &mut transactions, &leader_info_provider, &config, + &stats, ); assert!(transactions.is_empty()); assert_eq!( @@ -1002,6 +1173,7 @@ mod test { &mut transactions, &leader_info_provider, &config, + &stats, ); assert!(transactions.is_empty()); assert_eq!( @@ -1030,6 +1202,7 @@ mod test { &mut transactions, &leader_info_provider, &config, + &stats, ); assert!(transactions.is_empty()); assert_eq!( @@ -1059,6 +1232,7 @@ mod test { &mut transactions, &leader_info_provider, &config, + &stats, ); assert!(transactions.is_empty()); assert_eq!( @@ -1088,6 +1262,7 @@ mod test { &mut transactions, &leader_info_provider, &config, + &stats, ); assert_eq!(transactions.len(), 1); assert_eq!( @@ -1119,6 +1294,7 @@ mod test { &mut transactions, &leader_info_provider, &config, + &stats, ); assert_eq!(transactions.len(), 1); assert_eq!( @@ -1147,6 +1323,7 @@ mod test { &mut transactions, &leader_info_provider, &config, + &stats, ); assert_eq!(transactions.len(), 0); assert_eq!(