investigate system performance test degradation (#17919)

* Add stats and counter around cost model ops, mainly:
- calculate transaction cost
- check transaction can fit in a block
- update block cost tracker after transactions are added to block
- replay_stage to update/insert execution cost to table

* Change mutex on cost_tracker to RwLock

* removed cloning cost_tracker for local use, as the metrics show clone is very expensive.

* acquire and hold locks for block of TXs, instead of acquire and release per transaction;

* remove redundant would_fit check from cost_tracker update execution path

* refactor cost checking with less frequent lock acquiring

* avoid many Transaction_cost heap allocation when calculate cost, which
is in the hot path - executed per transaction.

* create hashmap with new_capacity to reduce runtime heap realloc.

* code review changes: categorize stats, replace explicit drop calls, concisely initiate to default

* address potential deadlock by acquiring locks one at time
This commit is contained in:
Tao Zhu 2021-06-28 21:34:04 -05:00 committed by GitHub
parent 47cafb70da
commit 9d6f1ebef4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 286 additions and 73 deletions

View File

@ -34,7 +34,7 @@ use solana_sdk::transaction::Transaction;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::sync::mpsc::Receiver; use std::sync::mpsc::Receiver;
use std::sync::{Arc, Mutex, RwLock}; use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use test::Bencher; use test::Bencher;
@ -94,7 +94,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) {
&BankingStageStats::default(), &BankingStageStats::default(),
&recorder, &recorder,
&Arc::new(RwLock::new(CostModel::default())), &Arc::new(RwLock::new(CostModel::default())),
&Arc::new(Mutex::new(CostTracker::new(std::u64::MAX, std::u64::MAX))), &Arc::new(RwLock::new(CostTracker::new(std::u64::MAX, std::u64::MAX))),
); );
}); });

View File

@ -1,7 +1,10 @@
//! The `banking_stage` processes Transaction messages. It is intended to be used //! The `banking_stage` processes Transaction messages. It is intended to be used
//! to contruct a software pipeline. The stage uses all available CPU cores and //! to contruct a software pipeline. The stage uses all available CPU cores and
//! can do its processing in parallel with signature verification on the GPU. //! can do its processing in parallel with signature verification on the GPU.
use crate::{cost_model::CostModel, cost_tracker::CostTracker, packet_hasher::PacketHasher}; use crate::{
cost_model::CostModel, cost_model::TransactionCost, cost_tracker::CostTracker,
packet_hasher::PacketHasher,
};
use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError}; use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError};
use itertools::Itertools; use itertools::Itertools;
use lru::LruCache; use lru::LruCache;
@ -76,6 +79,8 @@ const MAX_NUM_TRANSACTIONS_PER_BATCH: usize = 128;
const DEFAULT_LRU_SIZE: usize = 200_000; const DEFAULT_LRU_SIZE: usize = 200_000;
const MAX_WRITABLE_ACCOUNTS: usize = 256;
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct BankingStageStats { pub struct BankingStageStats {
last_report: AtomicU64, last_report: AtomicU64,
@ -87,6 +92,9 @@ pub struct BankingStageStats {
current_buffered_packets_count: AtomicUsize, current_buffered_packets_count: AtomicUsize,
rebuffered_packets_count: AtomicUsize, rebuffered_packets_count: AtomicUsize,
consumed_buffered_packets_count: AtomicUsize, consumed_buffered_packets_count: AtomicUsize,
reset_cost_tracker_count: AtomicUsize,
cost_tracker_check_count: AtomicUsize,
cost_forced_retry_transactions_count: AtomicUsize,
// Timing // Timing
consume_buffered_packets_elapsed: AtomicU64, consume_buffered_packets_elapsed: AtomicU64,
@ -95,7 +103,11 @@ pub struct BankingStageStats {
filter_pending_packets_elapsed: AtomicU64, filter_pending_packets_elapsed: AtomicU64,
packet_duplicate_check_elapsed: AtomicU64, packet_duplicate_check_elapsed: AtomicU64,
packet_conversion_elapsed: AtomicU64, packet_conversion_elapsed: AtomicU64,
unprocessed_packet_conversion_elapsed: AtomicU64,
transaction_processing_elapsed: AtomicU64, transaction_processing_elapsed: AtomicU64,
cost_tracker_update_elapsed: AtomicU64,
cost_tracker_clone_elapsed: AtomicU64,
cost_tracker_check_elapsed: AtomicU64,
} }
impl BankingStageStats { impl BankingStageStats {
@ -154,6 +166,22 @@ impl BankingStageStats {
self.rebuffered_packets_count.swap(0, Ordering::Relaxed) as i64, self.rebuffered_packets_count.swap(0, Ordering::Relaxed) as i64,
i64 i64
), ),
(
"reset_cost_tracker_count",
self.reset_cost_tracker_count.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"cost_tracker_check_count",
self.cost_tracker_check_count.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"cost_forced_retry_transactions_count",
self.cost_forced_retry_transactions_count
.swap(0, Ordering::Relaxed) as i64,
i64
),
( (
"consume_buffered_packets_elapsed", "consume_buffered_packets_elapsed",
self.consume_buffered_packets_elapsed self.consume_buffered_packets_elapsed
@ -188,12 +216,33 @@ impl BankingStageStats {
self.packet_conversion_elapsed.swap(0, Ordering::Relaxed) as i64, self.packet_conversion_elapsed.swap(0, Ordering::Relaxed) as i64,
i64 i64
), ),
(
"unprocessed_packet_conversion_elapsed",
self.unprocessed_packet_conversion_elapsed
.swap(0, Ordering::Relaxed) as i64,
i64
),
( (
"transaction_processing_elapsed", "transaction_processing_elapsed",
self.transaction_processing_elapsed self.transaction_processing_elapsed
.swap(0, Ordering::Relaxed) as i64, .swap(0, Ordering::Relaxed) as i64,
i64 i64
), ),
(
"cost_tracker_update_elapsed",
self.cost_tracker_update_elapsed.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"cost_tracker_clone_elapsed",
self.cost_tracker_clone_elapsed.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"cost_tracker_check_elapsed",
self.cost_tracker_check_elapsed.swap(0, Ordering::Relaxed) as i64,
i64
),
); );
} }
} }
@ -244,11 +293,16 @@ impl BankingStage {
gossip_vote_sender: ReplayVoteSender, gossip_vote_sender: ReplayVoteSender,
cost_model: &Arc<RwLock<CostModel>>, cost_model: &Arc<RwLock<CostModel>>,
) -> Self { ) -> Self {
// shared mutex guarded 'cost_tracker' tracks bank's cost against configured limits. // 'cost_tracker' tracks bank's cost against configured limits.
let cost_tracker = Arc::new(Mutex::new(CostTracker::new( let cost_tracker = {
cost_model.read().unwrap().get_account_cost_limit(), let cost_model = cost_model.read().unwrap();
cost_model.read().unwrap().get_block_cost_limit(), CostTracker::new(
))); cost_model.get_account_cost_limit(),
cost_model.get_block_cost_limit(),
)
};
let cost_tracker = Arc::new(RwLock::new(cost_tracker));
Self::new_num_threads( Self::new_num_threads(
cluster_info, cluster_info,
poh_recorder, poh_recorder,
@ -271,7 +325,7 @@ impl BankingStage {
transaction_status_sender: Option<TransactionStatusSender>, transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: ReplayVoteSender, gossip_vote_sender: ReplayVoteSender,
cost_model: &Arc<RwLock<CostModel>>, cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: &Arc<Mutex<CostTracker>>, cost_tracker: &Arc<RwLock<CostTracker>>,
) -> Self { ) -> Self {
let batch_limit = TOTAL_BUFFERED_PACKETS / ((num_threads - 1) as usize * PACKETS_PER_BATCH); let batch_limit = TOTAL_BUFFERED_PACKETS / ((num_threads - 1) as usize * PACKETS_PER_BATCH);
// Single thread to generate entries from many banks. // Single thread to generate entries from many banks.
@ -364,8 +418,15 @@ impl BankingStage {
has_more_unprocessed_transactions has_more_unprocessed_transactions
} }
fn reset_cost_tracker_if_new_bank(cost_tracker: &Arc<Mutex<CostTracker>>, bank_slot: Slot) { fn reset_cost_tracker_if_new_bank(
cost_tracker.lock().unwrap().reset_if_new_bank(bank_slot); cost_tracker: &Arc<RwLock<CostTracker>>,
bank_slot: Slot,
banking_stage_stats: &BankingStageStats,
) {
cost_tracker.write().unwrap().reset_if_new_bank(bank_slot);
banking_stage_stats
.reset_cost_tracker_count
.fetch_add(1, Ordering::Relaxed);
} }
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
@ -380,7 +441,7 @@ impl BankingStage {
banking_stage_stats: &BankingStageStats, banking_stage_stats: &BankingStageStats,
recorder: &TransactionRecorder, recorder: &TransactionRecorder,
cost_model: &Arc<RwLock<CostModel>>, cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: &Arc<Mutex<CostTracker>>, cost_tracker: &Arc<RwLock<CostTracker>>,
) { ) {
let mut rebuffered_packets_len = 0; let mut rebuffered_packets_len = 0;
let mut new_tx_count = 0; let mut new_tx_count = 0;
@ -400,6 +461,7 @@ impl BankingStage {
*next_leader, *next_leader,
cost_model, cost_model,
cost_tracker, cost_tracker,
banking_stage_stats,
); );
Self::update_buffered_packets_with_new_unprocessed( Self::update_buffered_packets_with_new_unprocessed(
original_unprocessed_indexes, original_unprocessed_indexes,
@ -408,7 +470,11 @@ impl BankingStage {
} else { } else {
let bank_start = poh_recorder.lock().unwrap().bank_start(); let bank_start = poh_recorder.lock().unwrap().bank_start();
if let Some((bank, bank_creation_time)) = bank_start { if let Some((bank, bank_creation_time)) = bank_start {
Self::reset_cost_tracker_if_new_bank(cost_tracker, bank.slot()); Self::reset_cost_tracker_if_new_bank(
cost_tracker,
bank.slot(),
banking_stage_stats,
);
let (processed, verified_txs_len, new_unprocessed_indexes) = let (processed, verified_txs_len, new_unprocessed_indexes) =
Self::process_packets_transactions( Self::process_packets_transactions(
&bank, &bank,
@ -524,7 +590,7 @@ impl BankingStage {
banking_stage_stats: &BankingStageStats, banking_stage_stats: &BankingStageStats,
recorder: &TransactionRecorder, recorder: &TransactionRecorder,
cost_model: &Arc<RwLock<CostModel>>, cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: &Arc<Mutex<CostTracker>>, cost_tracker: &Arc<RwLock<CostTracker>>,
) -> BufferedPacketsDecision { ) -> BufferedPacketsDecision {
let bank_start; let bank_start;
let ( let (
@ -536,7 +602,11 @@ impl BankingStage {
let poh = poh_recorder.lock().unwrap(); let poh = poh_recorder.lock().unwrap();
bank_start = poh.bank_start(); bank_start = poh.bank_start();
if let Some((ref bank, _)) = bank_start { if let Some((ref bank, _)) = bank_start {
Self::reset_cost_tracker_if_new_bank(cost_tracker, bank.slot()); Self::reset_cost_tracker_if_new_bank(
cost_tracker,
bank.slot(),
banking_stage_stats,
);
}; };
( (
poh.leader_after_n_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET), poh.leader_after_n_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET),
@ -641,7 +711,7 @@ impl BankingStage {
gossip_vote_sender: ReplayVoteSender, gossip_vote_sender: ReplayVoteSender,
duplicates: &Arc<Mutex<(LruCache<u64, ()>, PacketHasher)>>, duplicates: &Arc<Mutex<(LruCache<u64, ()>, PacketHasher)>>,
cost_model: &Arc<RwLock<CostModel>>, cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: &Arc<Mutex<CostTracker>>, cost_tracker: &Arc<RwLock<CostTracker>>,
) { ) {
let recorder = poh_recorder.lock().unwrap().recorder(); let recorder = poh_recorder.lock().unwrap().recorder();
let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
@ -1033,20 +1103,18 @@ impl BankingStage {
// and verifies secp256k1 instructions. A list of valid transactions are returned with their message hashes // and verifies secp256k1 instructions. A list of valid transactions are returned with their message hashes
// and packet indexes. // and packet indexes.
// Also returned is packet indexes for transaction should be retried due to cost limits. // Also returned is packet indexes for transaction should be retried due to cost limits.
#[allow(clippy::needless_collect)]
fn transactions_from_packets( fn transactions_from_packets(
msgs: &Packets, msgs: &Packets,
transaction_indexes: &[usize], transaction_indexes: &[usize],
secp256k1_program_enabled: bool, secp256k1_program_enabled: bool,
cost_model: &Arc<RwLock<CostModel>>, cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: &Arc<Mutex<CostTracker>>, cost_tracker: &Arc<RwLock<CostTracker>>,
banking_stage_stats: &BankingStageStats,
) -> (Vec<HashedTransaction<'static>>, Vec<usize>, Vec<usize>) { ) -> (Vec<HashedTransaction<'static>>, Vec<usize>, Vec<usize>) {
// Making a snapshot of shared cost_tracker by clone(), drop lock immediately.
// Local copy `cost_tracker` is used to filter transactions by cost.
// Shared cost_tracker is updated later by processed transactions confirmed by bank.
let mut cost_tracker = cost_tracker.lock().unwrap().clone();
let mut retryable_transaction_packet_indexes: Vec<usize> = vec![]; let mut retryable_transaction_packet_indexes: Vec<usize> = vec![];
let (filtered_transactions, filter_transaction_packet_indexes) = transaction_indexes
let verified_transactions_with_packet_indexes: Vec<_> = transaction_indexes
.iter() .iter()
.filter_map(|tx_index| { .filter_map(|tx_index| {
let p = &msgs.packets[*tx_index]; let p = &msgs.packets[*tx_index];
@ -1054,27 +1122,55 @@ impl BankingStage {
if secp256k1_program_enabled { if secp256k1_program_enabled {
tx.verify_precompiles().ok()?; tx.verify_precompiles().ok()?;
} }
Some((tx, *tx_index))
// Get transaction cost via cost_model; try to add cost to
// local copy of cost_tracker, if suceeded, local copy is updated
// and transaction added to valid list; otherwise, transaction is
// added to retry list. No locking here.
let tx_cost = cost_model.read().unwrap().calculate_cost(&tx);
let result = cost_tracker.try_add(tx_cost);
if result.is_err() {
debug!("transaction {:?} would exceed limit: {:?}", tx, result);
retryable_transaction_packet_indexes.push(*tx_index);
return None;
}
let message_bytes = Self::packet_message(p)?;
let message_hash = Message::hash_raw_message(message_bytes);
Some((
HashedTransaction::new(Cow::Owned(tx), message_hash),
tx_index,
))
}) })
.unzip(); .collect();
banking_stage_stats.cost_tracker_check_count.fetch_add(
verified_transactions_with_packet_indexes.len(),
Ordering::Relaxed,
);
let mut cost_tracker_check_time = Measure::start("cost_tracker_check_time");
let mut tx_cost = TransactionCost::new_with_capacity(MAX_WRITABLE_ACCOUNTS);
let filtered_transactions_with_packet_indexes: Vec<_> = {
let cost_model_readonly = cost_model.read().unwrap();
let cost_tracker_readonly = cost_tracker.read().unwrap();
verified_transactions_with_packet_indexes
.into_iter()
.filter_map(|(tx, tx_index)| {
cost_model_readonly.calculate_cost_no_alloc(&tx, &mut tx_cost);
let result = cost_tracker_readonly.would_fit(
&tx_cost.writable_accounts,
&(tx_cost.account_access_cost + tx_cost.execution_cost),
);
if result.is_err() {
debug!("transaction {:?} would exceed limit: {:?}", tx, result);
retryable_transaction_packet_indexes.push(tx_index);
return None;
}
Some((tx, tx_index))
})
.collect()
};
cost_tracker_check_time.stop();
let (filtered_transactions, filter_transaction_packet_indexes) =
filtered_transactions_with_packet_indexes
.into_iter()
.filter_map(|(tx, tx_index)| {
let p = &msgs.packets[tx_index];
let message_bytes = Self::packet_message(p)?;
let message_hash = Message::hash_raw_message(message_bytes);
Some((
HashedTransaction::new(Cow::Owned(tx), message_hash),
tx_index,
))
})
.unzip();
banking_stage_stats
.cost_tracker_check_elapsed
.fetch_add(cost_tracker_check_time.as_us(), Ordering::Relaxed);
( (
filtered_transactions, filtered_transactions,
@ -1133,7 +1229,7 @@ impl BankingStage {
gossip_vote_sender: &ReplayVoteSender, gossip_vote_sender: &ReplayVoteSender,
banking_stage_stats: &BankingStageStats, banking_stage_stats: &BankingStageStats,
cost_model: &Arc<RwLock<CostModel>>, cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: &Arc<Mutex<CostTracker>>, cost_tracker: &Arc<RwLock<CostTracker>>,
) -> (usize, usize, Vec<usize>) { ) -> (usize, usize, Vec<usize>) {
let mut packet_conversion_time = Measure::start("packet_conversion"); let mut packet_conversion_time = Measure::start("packet_conversion");
let (transactions, transaction_to_packet_indexes, retryable_packet_indexes) = let (transactions, transaction_to_packet_indexes, retryable_packet_indexes) =
@ -1143,9 +1239,14 @@ impl BankingStage {
bank.secp256k1_program_enabled(), bank.secp256k1_program_enabled(),
cost_model, cost_model,
cost_tracker, cost_tracker,
banking_stage_stats,
); );
packet_conversion_time.stop(); packet_conversion_time.stop();
inc_new_counter_info!("banking_stage-packet_conversion", 1);
banking_stage_stats
.cost_forced_retry_transactions_count
.fetch_add(retryable_packet_indexes.len(), Ordering::Relaxed);
debug!( debug!(
"bank: {} filtered transactions {} cost limited transactions {}", "bank: {} filtered transactions {} cost limited transactions {}",
bank.slot(), bank.slot(),
@ -1166,16 +1267,31 @@ impl BankingStage {
); );
process_tx_time.stop(); process_tx_time.stop();
let unprocessed_tx_count = unprocessed_tx_indexes.len(); let unprocessed_tx_count = unprocessed_tx_indexes.len();
inc_new_counter_info!(
"banking_stage-unprocessed_transactions",
unprocessed_tx_count
);
// applying cost of processed transactions to shared cost_tracker // applying cost of processed transactions to shared cost_tracker
transactions.iter().enumerate().for_each(|(index, tx)| { let mut cost_tracking_time = Measure::start("cost_tracking_time");
if !unprocessed_tx_indexes.iter().any(|&i| i == index) { let mut tx_cost = TransactionCost::new_with_capacity(MAX_WRITABLE_ACCOUNTS);
let tx_cost = cost_model.read().unwrap().calculate_cost(tx.transaction()); {
let mut guard = cost_tracker.lock().unwrap(); //let cost_model_readonly = cost_model.read().unwrap();
let _result = guard.try_add(tx_cost); //let mut cost_tracker_mutable = cost_tracker.write().unwrap();
drop(guard); transactions.iter().enumerate().for_each(|(index, tx)| {
} if !unprocessed_tx_indexes.iter().any(|&i| i == index) {
}); cost_model
.read()
.unwrap()
.calculate_cost_no_alloc(tx.transaction(), &mut tx_cost);
cost_tracker.write().unwrap().add_transaction(
&tx_cost.writable_accounts,
&(tx_cost.account_access_cost + tx_cost.execution_cost),
);
}
});
}
cost_tracking_time.stop();
let mut filter_pending_packets_time = Measure::start("filter_pending_packets_time"); let mut filter_pending_packets_time = Measure::start("filter_pending_packets_time");
let mut filtered_unprocessed_packet_indexes = Self::filter_pending_packets_from_pending_txs( let mut filtered_unprocessed_packet_indexes = Self::filter_pending_packets_from_pending_txs(
@ -1186,21 +1302,24 @@ impl BankingStage {
); );
filter_pending_packets_time.stop(); filter_pending_packets_time.stop();
// combine cost-related unprocessed transactions with bank determined unprocessed for
// buffering
filtered_unprocessed_packet_indexes.extend(retryable_packet_indexes);
inc_new_counter_info!( inc_new_counter_info!(
"banking_stage-dropped_tx_before_forwarding", "banking_stage-dropped_tx_before_forwarding",
unprocessed_tx_count.saturating_sub(filtered_unprocessed_packet_indexes.len()) unprocessed_tx_count.saturating_sub(filtered_unprocessed_packet_indexes.len())
); );
// combine cost-related unprocessed transactions with bank determined unprocessed for
// buffering
filtered_unprocessed_packet_indexes.extend(retryable_packet_indexes);
banking_stage_stats banking_stage_stats
.packet_conversion_elapsed .packet_conversion_elapsed
.fetch_add(packet_conversion_time.as_us(), Ordering::Relaxed); .fetch_add(packet_conversion_time.as_us(), Ordering::Relaxed);
banking_stage_stats banking_stage_stats
.transaction_processing_elapsed .transaction_processing_elapsed
.fetch_add(process_tx_time.as_us(), Ordering::Relaxed); .fetch_add(process_tx_time.as_us(), Ordering::Relaxed);
banking_stage_stats
.cost_tracker_update_elapsed
.fetch_add(cost_tracking_time.as_us(), Ordering::Relaxed);
banking_stage_stats banking_stage_stats
.filter_pending_packets_elapsed .filter_pending_packets_elapsed
.fetch_add(filter_pending_packets_time.as_us(), Ordering::Relaxed); .fetch_add(filter_pending_packets_time.as_us(), Ordering::Relaxed);
@ -1215,7 +1334,8 @@ impl BankingStage {
my_pubkey: &Pubkey, my_pubkey: &Pubkey,
next_leader: Option<Pubkey>, next_leader: Option<Pubkey>,
cost_model: &Arc<RwLock<CostModel>>, cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: &Arc<Mutex<CostTracker>>, cost_tracker: &Arc<RwLock<CostTracker>>,
banking_stage_stats: &BankingStageStats,
) -> Vec<usize> { ) -> Vec<usize> {
// Check if we are the next leader. If so, let's not filter the packets // Check if we are the next leader. If so, let's not filter the packets
// as we'll filter it again while processing the packets. // as we'll filter it again while processing the packets.
@ -1226,6 +1346,8 @@ impl BankingStage {
} }
} }
let mut unprocessed_packet_conversion_time =
Measure::start("unprocessed_packet_conversion");
let (transactions, transaction_to_packet_indexes, retry_packet_indexes) = let (transactions, transaction_to_packet_indexes, retry_packet_indexes) =
Self::transactions_from_packets( Self::transactions_from_packets(
msgs, msgs,
@ -1233,7 +1355,9 @@ impl BankingStage {
bank.secp256k1_program_enabled(), bank.secp256k1_program_enabled(),
cost_model, cost_model,
cost_tracker, cost_tracker,
banking_stage_stats,
); );
unprocessed_packet_conversion_time.stop();
let tx_count = transaction_to_packet_indexes.len(); let tx_count = transaction_to_packet_indexes.len();
@ -1251,6 +1375,12 @@ impl BankingStage {
"banking_stage-dropped_tx_before_forwarding", "banking_stage-dropped_tx_before_forwarding",
tx_count.saturating_sub(filtered_unprocessed_packet_indexes.len()) tx_count.saturating_sub(filtered_unprocessed_packet_indexes.len())
); );
banking_stage_stats
.unprocessed_packet_conversion_elapsed
.fetch_add(
unprocessed_packet_conversion_time.as_us(),
Ordering::Relaxed,
);
filtered_unprocessed_packet_indexes filtered_unprocessed_packet_indexes
} }
@ -1287,7 +1417,7 @@ impl BankingStage {
duplicates: &Arc<Mutex<(LruCache<u64, ()>, PacketHasher)>>, duplicates: &Arc<Mutex<(LruCache<u64, ()>, PacketHasher)>>,
recorder: &TransactionRecorder, recorder: &TransactionRecorder,
cost_model: &Arc<RwLock<CostModel>>, cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: &Arc<Mutex<CostTracker>>, cost_tracker: &Arc<RwLock<CostTracker>>,
) -> Result<(), RecvTimeoutError> { ) -> Result<(), RecvTimeoutError> {
let mut recv_time = Measure::start("process_packets_recv"); let mut recv_time = Measure::start("process_packets_recv");
let mms = verified_receiver.recv_timeout(recv_timeout)?; let mms = verified_receiver.recv_timeout(recv_timeout)?;
@ -1326,7 +1456,7 @@ impl BankingStage {
continue; continue;
} }
let (bank, bank_creation_time) = bank_start.unwrap(); let (bank, bank_creation_time) = bank_start.unwrap();
Self::reset_cost_tracker_if_new_bank(cost_tracker, bank.slot()); Self::reset_cost_tracker_if_new_bank(cost_tracker, bank.slot(), banking_stage_stats);
let (processed, verified_txs_len, unprocessed_indexes) = let (processed, verified_txs_len, unprocessed_indexes) =
Self::process_packets_transactions( Self::process_packets_transactions(
@ -1372,6 +1502,7 @@ impl BankingStage {
next_leader, next_leader,
cost_model, cost_model,
cost_tracker, cost_tracker,
banking_stage_stats,
); );
Self::push_unprocessed( Self::push_unprocessed(
buffered_packets, buffered_packets,
@ -1835,7 +1966,7 @@ mod tests {
None, None,
gossip_vote_sender, gossip_vote_sender,
&Arc::new(RwLock::new(CostModel::default())), &Arc::new(RwLock::new(CostModel::default())),
&Arc::new(Mutex::new(CostTracker::new( &Arc::new(RwLock::new(CostTracker::new(
ACCOUNT_MAX_COST, ACCOUNT_MAX_COST,
BLOCK_MAX_COST, BLOCK_MAX_COST,
))), ))),
@ -2660,7 +2791,7 @@ mod tests {
&BankingStageStats::default(), &BankingStageStats::default(),
&recorder, &recorder,
&Arc::new(RwLock::new(CostModel::default())), &Arc::new(RwLock::new(CostModel::default())),
&Arc::new(Mutex::new(CostTracker::new( &Arc::new(RwLock::new(CostTracker::new(
ACCOUNT_MAX_COST, ACCOUNT_MAX_COST,
BLOCK_MAX_COST, BLOCK_MAX_COST,
))), ))),
@ -2681,7 +2812,7 @@ mod tests {
&BankingStageStats::default(), &BankingStageStats::default(),
&recorder, &recorder,
&Arc::new(RwLock::new(CostModel::default())), &Arc::new(RwLock::new(CostModel::default())),
&Arc::new(Mutex::new(CostTracker::new( &Arc::new(RwLock::new(CostTracker::new(
ACCOUNT_MAX_COST, ACCOUNT_MAX_COST,
BLOCK_MAX_COST, BLOCK_MAX_COST,
))), ))),
@ -2751,7 +2882,7 @@ mod tests {
&BankingStageStats::default(), &BankingStageStats::default(),
&recorder, &recorder,
&Arc::new(RwLock::new(CostModel::default())), &Arc::new(RwLock::new(CostModel::default())),
&Arc::new(Mutex::new(CostTracker::new( &Arc::new(RwLock::new(CostTracker::new(
ACCOUNT_MAX_COST, ACCOUNT_MAX_COST,
BLOCK_MAX_COST, BLOCK_MAX_COST,
))), ))),

View File

@ -23,6 +23,8 @@ const NON_SIGNED_READONLY_ACCOUNT_ACCESS_COST: u64 = 7;
pub const ACCOUNT_MAX_COST: u64 = 100_000_000; pub const ACCOUNT_MAX_COST: u64 = 100_000_000;
pub const BLOCK_MAX_COST: u64 = 2_500_000_000; pub const BLOCK_MAX_COST: u64 = 2_500_000_000;
const DEMOTE_SYSVAR_WRITE_LOCKS: bool = true;
// cost of transaction is made of account_access_cost and instruction execution_cost // cost of transaction is made of account_access_cost and instruction execution_cost
// where // where
// account_access_cost is the sum of read/write/sign all accounts included in the transaction // account_access_cost is the sum of read/write/sign all accounts included in the transaction
@ -36,6 +38,21 @@ pub struct TransactionCost {
pub execution_cost: u64, pub execution_cost: u64,
} }
impl TransactionCost {
pub fn new_with_capacity(capacity: usize) -> Self {
Self {
writable_accounts: Vec::with_capacity(capacity),
..Self::default()
}
}
pub fn reset(&mut self) {
self.writable_accounts.clear();
self.account_access_cost = 0;
self.execution_cost = 0;
}
}
#[derive(Debug)] #[derive(Debug)]
pub struct CostModel { pub struct CostModel {
account_cost_limit: u64, account_cost_limit: u64,
@ -90,6 +107,34 @@ impl CostModel {
cost cost
} }
// calculate `transaction` cost, the result is passed back to caller via mutable
// parameter `cost`. Existing content in `cost` will be erased before adding new content
// This is to allow this function to reuse pre-allocated memory, as this function
// is often on hot-path.
pub fn calculate_cost_no_alloc(&self, transaction: &Transaction, cost: &mut TransactionCost) {
cost.reset();
let message = transaction.message();
message.account_keys.iter().enumerate().for_each(|(i, k)| {
let is_signer = message.is_signer(i);
let is_writable = message.is_writable(i, DEMOTE_SYSVAR_WRITE_LOCKS);
if is_signer && is_writable {
cost.writable_accounts.push(*k);
cost.account_access_cost += SIGNED_WRITABLE_ACCOUNT_ACCESS_COST;
} else if is_signer && !is_writable {
cost.account_access_cost += SIGNED_READONLY_ACCOUNT_ACCESS_COST;
} else if !is_signer && is_writable {
cost.writable_accounts.push(*k);
cost.account_access_cost += NON_SIGNED_WRITABLE_ACCOUNT_ACCESS_COST;
} else {
cost.account_access_cost += NON_SIGNED_READONLY_ACCOUNT_ACCESS_COST;
}
});
cost.execution_cost = self.find_transaction_cost(transaction);
debug!("transaction {:?} has cost {:?}", transaction, cost);
}
// To update or insert instruction cost to table. // To update or insert instruction cost to table.
pub fn upsert_instruction_cost( pub fn upsert_instruction_cost(
&mut self, &mut self,
@ -400,6 +445,33 @@ mod tests {
assert_eq!(2, tx_cost.writable_accounts.len()); assert_eq!(2, tx_cost.writable_accounts.len());
} }
#[test]
fn test_cost_model_calculate_cost_no_alloc() {
let (mint_keypair, start_hash) = test_setup();
let tx =
system_transaction::transfer(&mint_keypair, &Keypair::new().pubkey(), 2, start_hash);
let expected_account_cost = SIGNED_WRITABLE_ACCOUNT_ACCESS_COST
+ NON_SIGNED_WRITABLE_ACCOUNT_ACCESS_COST
+ NON_SIGNED_READONLY_ACCOUNT_ACCESS_COST;
let expected_execution_cost = 8;
let mut cost_model = CostModel::default();
cost_model
.upsert_instruction_cost(&system_program::id(), &expected_execution_cost)
.unwrap();
// allocate cost, set some random number
let mut tx_cost = TransactionCost::new_with_capacity(8);
tx_cost.execution_cost = 101;
tx_cost.writable_accounts.push(Pubkey::new_unique());
cost_model.calculate_cost_no_alloc(&tx, &mut tx_cost);
assert_eq!(expected_account_cost, tx_cost.account_access_cost);
assert_eq!(expected_execution_cost, tx_cost.execution_cost);
assert_eq!(2, tx_cost.writable_accounts.len());
}
#[test] #[test]
fn test_cost_model_update_instruction_cost() { fn test_cost_model_update_instruction_cost() {
let key1 = Pubkey::new_unique(); let key1 = Pubkey::new_unique();

View File

@ -5,6 +5,8 @@ use crate::cost_model::TransactionCost;
use solana_sdk::{clock::Slot, pubkey::Pubkey}; use solana_sdk::{clock::Slot, pubkey::Pubkey};
use std::collections::HashMap; use std::collections::HashMap;
const WRITABLE_ACCOUNTS_PER_BLOCK: usize = 512;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct CostTracker { pub struct CostTracker {
account_cost_limit: u64, account_cost_limit: u64,
@ -21,7 +23,7 @@ impl CostTracker {
account_cost_limit: chain_max, account_cost_limit: chain_max,
block_cost_limit: package_max, block_cost_limit: package_max,
current_bank_slot: 0, current_bank_slot: 0,
cost_by_writable_accounts: HashMap::new(), cost_by_writable_accounts: HashMap::with_capacity(WRITABLE_ACCOUNTS_PER_BLOCK),
block_cost: 0, block_cost: 0,
} }
} }
@ -42,7 +44,7 @@ impl CostTracker {
Ok(self.block_cost) Ok(self.block_cost)
} }
fn would_fit(&self, keys: &[Pubkey], cost: &u64) -> Result<(), &'static str> { pub fn would_fit(&self, keys: &[Pubkey], cost: &u64) -> Result<(), &'static str> {
// check against the total package cost // check against the total package cost
if self.block_cost + cost > self.block_cost_limit { if self.block_cost + cost > self.block_cost_limit {
return Err("would exceed block cost limit"); return Err("would exceed block cost limit");
@ -70,7 +72,7 @@ impl CostTracker {
Ok(()) Ok(())
} }
fn add_transaction(&mut self, keys: &[Pubkey], cost: &u64) { pub fn add_transaction(&mut self, keys: &[Pubkey], cost: &u64) {
for account_key in keys.iter() { for account_key in keys.iter() {
*self *self
.cost_by_writable_accounts .cost_by_writable_accounts

View File

@ -33,8 +33,8 @@ impl ExecuteCostTable {
pub fn new(cap: usize) -> Self { pub fn new(cap: usize) -> Self {
Self { Self {
capacity: cap, capacity: cap,
table: HashMap::new(), table: HashMap::with_capacity(cap),
occurrences: HashMap::new(), occurrences: HashMap::with_capacity(cap),
} }
} }

View File

@ -1720,10 +1720,6 @@ impl ReplayStage {
verify_recyclers, verify_recyclers,
); );
Self::update_cost_model(cost_model, &bank_progress.replay_stats.execute_timings); Self::update_cost_model(cost_model, &bank_progress.replay_stats.execute_timings);
debug!(
"after replayed into bank, updated cost model instruction cost table, current values: {:?}",
cost_model.read().unwrap().get_instruction_cost_table()
);
match replay_result { match replay_result {
Ok(replay_tx_count) => tx_count += replay_tx_count, Ok(replay_tx_count) => tx_count += replay_tx_count,
Err(err) => { Err(err) => {
@ -1915,6 +1911,7 @@ impl ReplayStage {
} }
fn update_cost_model(cost_model: &RwLock<CostModel>, execute_timings: &ExecuteTimings) { fn update_cost_model(cost_model: &RwLock<CostModel>, execute_timings: &ExecuteTimings) {
let mut update_cost_model_time = Measure::start("update_cost_model_time");
let mut cost_model_mutable = cost_model.write().unwrap(); let mut cost_model_mutable = cost_model.write().unwrap();
for (program_id, stats) in &execute_timings.details.per_program_timings { for (program_id, stats) in &execute_timings.details.per_program_timings {
let cost = stats.0 / stats.1 as u64; let cost = stats.0 / stats.1 as u64;
@ -1937,7 +1934,18 @@ impl ReplayStage {
debug!( debug!(
"after replayed into bank, updated cost model instruction cost table, current values: {:?}", "after replayed into bank, updated cost model instruction cost table, current values: {:?}",
cost_model.read().unwrap().get_instruction_cost_table() cost_model.read().unwrap().get_instruction_cost_table()
); );
update_cost_model_time.stop();
inc_new_counter_info!("replay_stage-update_cost_model", 1);
datapoint_info!(
"replay-loop-timing-stats",
(
"update_cost_model_elapsed",
update_cost_model_time.as_us() as i64,
i64
)
);
} }
fn update_propagation_status( fn update_propagation_status(