Make CostTracker aware of inflight transactions (#437)

When a leader is packing a Bank, transactions costs are added to the
CostTracker and then later updated or removed, depending on if the
tx is committed. However, it is possible for a Bank to be frozen while
there are several tx's in flight.

CostUpdateService submits a metric with cost information almost
immediately after a Bank has been frozen. The result is that we have
observed cost details being submitted before some cost removals take
place, which causes a massive over-reporting of the block cost
compared to actual.

This PR adds a field to track the number of transactions that are
inflight, and adds a simple mechanism to try to allow that value to
settle to zero before submitting the datapoint. The number of inflight
tx's is submitted with the datapoint, so even if the value does not
settle to zero, we can still detect this case and know the metric is
tainted.

Co-authored-by: Andrew Fitzgerald <apfitzge@gmail.com>
This commit is contained in:
steviez 2024-03-29 07:34:12 -05:00 committed by GitHub
parent d140cdbeab
commit 9076348ef4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 63 additions and 1 deletions

View File

@ -120,6 +120,7 @@ impl QosService {
} }
}) })
.collect(); .collect();
cost_tracker.add_transactions_in_flight(num_included);
cost_tracking_time.stop(); cost_tracking_time.stop();
self.metrics self.metrics
@ -167,17 +168,20 @@ impl QosService {
bank: &Bank, bank: &Bank,
) { ) {
let mut cost_tracker = bank.write_cost_tracker().unwrap(); let mut cost_tracker = bank.write_cost_tracker().unwrap();
let mut num_included = 0;
transaction_cost_results transaction_cost_results
.zip(transaction_committed_status) .zip(transaction_committed_status)
.for_each(|(tx_cost, transaction_committed_details)| { .for_each(|(tx_cost, transaction_committed_details)| {
// Only transactions that the qos service included have to be // Only transactions that the qos service included have to be
// checked for update // checked for update
if let Ok(tx_cost) = tx_cost { if let Ok(tx_cost) = tx_cost {
num_included += 1;
if *transaction_committed_details == CommitTransactionDetails::NotCommitted { if *transaction_committed_details == CommitTransactionDetails::NotCommitted {
cost_tracker.remove(tx_cost) cost_tracker.remove(tx_cost)
} }
} }
}); });
cost_tracker.sub_transactions_in_flight(num_included);
} }
fn update_committed_transaction_costs<'a>( fn update_committed_transaction_costs<'a>(
@ -206,13 +210,16 @@ impl QosService {
bank: &Bank, bank: &Bank,
) { ) {
let mut cost_tracker = bank.write_cost_tracker().unwrap(); let mut cost_tracker = bank.write_cost_tracker().unwrap();
let mut num_included = 0;
transaction_cost_results.for_each(|tx_cost| { transaction_cost_results.for_each(|tx_cost| {
// Only transactions that the qos service included have to be // Only transactions that the qos service included have to be
// removed // removed
if let Ok(tx_cost) = tx_cost { if let Ok(tx_cost) = tx_cost {
num_included += 1;
cost_tracker.remove(tx_cost); cost_tracker.remove(tx_cost);
} }
}); });
cost_tracker.sub_transactions_in_flight(num_included);
} }
// metrics are reported by bank slot // metrics are reported by bank slot

View File

@ -7,6 +7,7 @@ use {
std::{ std::{
sync::Arc, sync::Arc,
thread::{self, Builder, JoinHandle}, thread::{self, Builder, JoinHandle},
time::Duration,
}, },
}; };
pub enum CostUpdate { pub enum CostUpdate {
@ -19,6 +20,12 @@ pub struct CostUpdateService {
thread_hdl: JoinHandle<()>, thread_hdl: JoinHandle<()>,
} }
// The maximum number of retries to check if CostTracker::in_flight_transaction_count() has settled
// to zero. Bail out after this many retries; the in-flight count is reported so this is ok
const MAX_LOOP_COUNT: usize = 25;
// Throttle checking the count to avoid excessive polling
const LOOP_LIMITER: Duration = Duration::from_millis(10);
impl CostUpdateService { impl CostUpdateService {
pub fn new(blockstore: Arc<Blockstore>, cost_update_receiver: CostUpdateReceiver) -> Self { pub fn new(blockstore: Arc<Blockstore>, cost_update_receiver: CostUpdateReceiver) -> Self {
let thread_hdl = Builder::new() let thread_hdl = Builder::new()
@ -39,7 +46,28 @@ impl CostUpdateService {
for cost_update in cost_update_receiver.iter() { for cost_update in cost_update_receiver.iter() {
match cost_update { match cost_update {
CostUpdate::FrozenBank { bank } => { CostUpdate::FrozenBank { bank } => {
bank.read_cost_tracker().unwrap().report_stats(bank.slot()); for loop_count in 1..=MAX_LOOP_COUNT {
{
// Release the lock so that the thread that will
// update the count is able to obtain a write lock
//
// Use inner scope to avoid sleeping with the lock
let cost_tracker = bank.read_cost_tracker().unwrap();
let in_flight_transaction_count =
cost_tracker.in_flight_transaction_count();
if in_flight_transaction_count == 0 || loop_count == MAX_LOOP_COUNT {
let slot = bank.slot();
trace!(
"inflight transaction count is {in_flight_transaction_count} \
for slot {slot} after {loop_count} iteration(s)"
);
cost_tracker.report_stats(slot);
break;
}
}
std::thread::sleep(LOOP_LIMITER);
}
} }
} }
} }

View File

@ -61,6 +61,10 @@ pub struct CostTracker {
transaction_signature_count: u64, transaction_signature_count: u64,
secp256k1_instruction_signature_count: u64, secp256k1_instruction_signature_count: u64,
ed25519_instruction_signature_count: u64, ed25519_instruction_signature_count: u64,
/// The number of transactions that have had their estimated cost added to
/// the tracker, but are still waiting for an update with actual usage or
/// removal if the transaction does not end up getting committed.
in_flight_transaction_count: usize,
} }
impl Default for CostTracker { impl Default for CostTracker {
@ -83,6 +87,7 @@ impl Default for CostTracker {
transaction_signature_count: 0, transaction_signature_count: 0,
secp256k1_instruction_signature_count: 0, secp256k1_instruction_signature_count: 0,
ed25519_instruction_signature_count: 0, ed25519_instruction_signature_count: 0,
in_flight_transaction_count: 0,
} }
} }
} }
@ -100,6 +105,23 @@ impl CostTracker {
self.vote_cost_limit = vote_cost_limit; self.vote_cost_limit = vote_cost_limit;
} }
pub fn in_flight_transaction_count(&self) -> usize {
self.in_flight_transaction_count
}
pub fn add_transactions_in_flight(&mut self, in_flight_transaction_count: usize) {
saturating_add_assign!(
self.in_flight_transaction_count,
in_flight_transaction_count
);
}
pub fn sub_transactions_in_flight(&mut self, in_flight_transaction_count: usize) {
self.in_flight_transaction_count = self
.in_flight_transaction_count
.saturating_sub(in_flight_transaction_count);
}
pub fn try_add(&mut self, tx_cost: &TransactionCost) -> Result<u64, CostTrackerError> { pub fn try_add(&mut self, tx_cost: &TransactionCost) -> Result<u64, CostTrackerError> {
self.would_fit(tx_cost)?; self.would_fit(tx_cost)?;
self.add_transaction_cost(tx_cost); self.add_transaction_cost(tx_cost);
@ -174,6 +196,11 @@ impl CostTracker {
self.ed25519_instruction_signature_count, self.ed25519_instruction_signature_count,
i64 i64
), ),
(
"inflight_transaction_count",
self.in_flight_transaction_count,
i64
),
); );
} }