TransactionScheduler: Clean already processed or old transactions from container (#34233)
This commit is contained in:
parent
0a2ff8525a
commit
df8893772e
|
@ -16,10 +16,12 @@ use {
|
|||
TOTAL_BUFFERED_PACKETS,
|
||||
},
|
||||
crossbeam_channel::RecvTimeoutError,
|
||||
solana_accounts_db::transaction_error_metrics::TransactionErrorMetrics,
|
||||
solana_measure::measure_us,
|
||||
solana_runtime::bank_forks::BankForks,
|
||||
solana_sdk::{
|
||||
saturating_add_assign, timing::AtomicInterval, transaction::SanitizedTransaction,
|
||||
clock::MAX_PROCESSING_AGE, saturating_add_assign, timing::AtomicInterval,
|
||||
transaction::SanitizedTransaction,
|
||||
},
|
||||
std::{
|
||||
sync::{Arc, RwLock},
|
||||
|
@ -128,7 +130,11 @@ impl SchedulerController {
|
|||
let (_, clear_time_us) = measure_us!(self.clear_container());
|
||||
saturating_add_assign!(self.timing_metrics.clear_time_us, clear_time_us);
|
||||
}
|
||||
BufferedPacketsDecision::ForwardAndHold | BufferedPacketsDecision::Hold => {}
|
||||
BufferedPacketsDecision::ForwardAndHold => {
|
||||
let (_, clean_time_us) = measure_us!(self.clean_queue());
|
||||
saturating_add_assign!(self.timing_metrics.clean_time_us, clean_time_us);
|
||||
}
|
||||
BufferedPacketsDecision::Hold => {}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
@ -143,6 +149,53 @@ impl SchedulerController {
|
|||
}
|
||||
}
|
||||
|
||||
/// Clean unprocessable transactions from the queue. These will be transactions that are
|
||||
/// expired, already processed, or are no longer sanitizable.
|
||||
/// This only clears pending transactions, and does **not** clear in-flight transactions.
|
||||
fn clean_queue(&mut self) {
|
||||
// Clean up any transactions that have already been processed, are too old, or do not have
|
||||
// valid nonce accounts.
|
||||
const MAX_TRANSACTION_CHECKS: usize = 10_000;
|
||||
let mut transaction_ids = Vec::with_capacity(MAX_TRANSACTION_CHECKS);
|
||||
|
||||
while let Some(id) = self.container.pop() {
|
||||
transaction_ids.push(id);
|
||||
}
|
||||
|
||||
let bank = self.bank_forks.read().unwrap().working_bank();
|
||||
|
||||
const CHUNK_SIZE: usize = 128;
|
||||
let mut error_counters = TransactionErrorMetrics::default();
|
||||
|
||||
for chunk in transaction_ids.chunks(CHUNK_SIZE) {
|
||||
let lock_results = vec![Ok(()); chunk.len()];
|
||||
let sanitized_txs: Vec<_> = chunk
|
||||
.iter()
|
||||
.map(|id| {
|
||||
&self
|
||||
.container
|
||||
.get_transaction_ttl(&id.id)
|
||||
.expect("transaction must exist")
|
||||
.transaction
|
||||
})
|
||||
.collect();
|
||||
|
||||
let check_results = bank.check_transactions(
|
||||
&sanitized_txs,
|
||||
&lock_results,
|
||||
MAX_PROCESSING_AGE,
|
||||
&mut error_counters,
|
||||
);
|
||||
|
||||
for ((result, _nonce), id) in check_results.into_iter().zip(chunk.iter()) {
|
||||
if result.is_err() {
|
||||
saturating_add_assign!(self.count_metrics.num_dropped_on_age_and_status, 1);
|
||||
self.container.remove_by_id(&id.id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Receives completed transactions from the workers and updates metrics.
|
||||
fn receive_completed(&mut self) -> Result<(), SchedulerError> {
|
||||
let ((num_transactions, num_retryable), receive_completed_time_us) =
|
||||
|
@ -275,6 +328,8 @@ struct SchedulerCountMetrics {
|
|||
num_dropped_on_validate_locks: usize,
|
||||
/// Number of transactions that were dropped due to clearing.
|
||||
num_dropped_on_clear: usize,
|
||||
/// Number of transactions that were dropped due to age and status checks.
|
||||
num_dropped_on_age_and_status: usize,
|
||||
/// Number of transactions that were dropped due to exceeded capacity.
|
||||
num_dropped_on_capacity: usize,
|
||||
}
|
||||
|
@ -311,6 +366,11 @@ impl SchedulerCountMetrics {
|
|||
i64
|
||||
),
|
||||
("num_dropped_on_clear", self.num_dropped_on_clear, i64),
|
||||
(
|
||||
"num_dropped_on_age_and_status",
|
||||
self.num_dropped_on_age_and_status,
|
||||
i64
|
||||
),
|
||||
("num_dropped_on_capacity", self.num_dropped_on_capacity, i64)
|
||||
);
|
||||
}
|
||||
|
@ -326,6 +386,7 @@ impl SchedulerCountMetrics {
|
|||
|| self.num_dropped_on_sanitization != 0
|
||||
|| self.num_dropped_on_validate_locks != 0
|
||||
|| self.num_dropped_on_clear != 0
|
||||
|| self.num_dropped_on_age_and_status != 0
|
||||
|| self.num_dropped_on_capacity != 0
|
||||
}
|
||||
|
||||
|
@ -340,6 +401,7 @@ impl SchedulerCountMetrics {
|
|||
self.num_dropped_on_sanitization = 0;
|
||||
self.num_dropped_on_validate_locks = 0;
|
||||
self.num_dropped_on_clear = 0;
|
||||
self.num_dropped_on_age_and_status = 0;
|
||||
self.num_dropped_on_capacity = 0;
|
||||
}
|
||||
}
|
||||
|
@ -357,6 +419,8 @@ struct SchedulerTimingMetrics {
|
|||
schedule_time_us: u64,
|
||||
/// Time spent clearing transactions from the container.
|
||||
clear_time_us: u64,
|
||||
/// Time spent cleaning expired or processed transactions from the container.
|
||||
clean_time_us: u64,
|
||||
/// Time spent receiving completed transactions.
|
||||
receive_completed_time_us: u64,
|
||||
}
|
||||
|
@ -380,6 +444,7 @@ impl SchedulerTimingMetrics {
|
|||
("buffer_time", self.buffer_time_us, i64),
|
||||
("schedule_time", self.schedule_time_us, i64),
|
||||
("clear_time", self.clear_time_us, i64),
|
||||
("clean_time", self.clean_time_us, i64),
|
||||
(
|
||||
"receive_completed_time",
|
||||
self.receive_completed_time_us,
|
||||
|
@ -389,9 +454,12 @@ impl SchedulerTimingMetrics {
|
|||
}
|
||||
|
||||
fn reset(&mut self) {
|
||||
self.decision_time_us = 0;
|
||||
self.receive_time_us = 0;
|
||||
self.buffer_time_us = 0;
|
||||
self.schedule_time_us = 0;
|
||||
self.clear_time_us = 0;
|
||||
self.clean_time_us = 0;
|
||||
self.receive_completed_time_us = 0;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue