diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 5134ded31d..0baf490472 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -1,7 +1,10 @@ //! The `banking_stage` processes Transaction messages. It is intended to be used //! to contruct a software pipeline. The stage uses all available CPU cores and //! can do its processing in parallel with signature verification on the GPU. -use crate::packet_hasher::PacketHasher; +use crate::{ + packet_hasher::PacketHasher, + qos_service::{QosService, QosServiceStats}, +}; use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError}; use itertools::Itertools; use lru::LruCache; @@ -26,7 +29,6 @@ use solana_runtime::{ }, bank_utils, cost_model::CostModel, - cost_tracker::CostTracker, transaction_batch::TransactionBatch, vote_sender_types::ReplayVoteSender, }; @@ -55,7 +57,7 @@ use std::{ net::{SocketAddr, UdpSocket}, ops::DerefMut, sync::atomic::{AtomicU64, AtomicUsize, Ordering}, - sync::{Arc, Mutex, RwLock, RwLockReadGuard}, + sync::{Arc, Mutex, RwLock}, thread::{self, Builder, JoinHandle}, time::Duration, time::Instant, @@ -97,8 +99,6 @@ pub struct BankingStageStats { current_buffered_packet_batches_count: AtomicUsize, rebuffered_packets_count: AtomicUsize, consumed_buffered_packets_count: AtomicUsize, - cost_tracker_check_count: AtomicUsize, - cost_forced_retry_transactions_count: AtomicUsize, // Timing consume_buffered_packets_elapsed: AtomicU64, @@ -109,9 +109,6 @@ pub struct BankingStageStats { packet_conversion_elapsed: AtomicU64, unprocessed_packet_conversion_elapsed: AtomicU64, transaction_processing_elapsed: AtomicU64, - cost_tracker_update_elapsed: AtomicU64, - cost_tracker_clone_elapsed: AtomicU64, - cost_tracker_check_elapsed: AtomicU64, } impl BankingStageStats { @@ -181,17 +178,6 @@ impl BankingStageStats { .swap(0, Ordering::Relaxed) as i64, i64 ), - ( - "cost_tracker_check_count", - self.cost_tracker_check_count.swap(0, Ordering::Relaxed) as i64, - i64 - ), - ( - "cost_forced_retry_transactions_count", - self.cost_forced_retry_transactions_count - .swap(0, Ordering::Relaxed) as i64, - i64 - ), ( "consume_buffered_packets_elapsed", self.consume_buffered_packets_elapsed @@ -238,21 +224,6 @@ impl BankingStageStats { .swap(0, Ordering::Relaxed) as i64, i64 ), - ( - "cost_tracker_update_elapsed", - self.cost_tracker_update_elapsed.swap(0, Ordering::Relaxed) as i64, - i64 - ), - ( - "cost_tracker_clone_elapsed", - self.cost_tracker_clone_elapsed.swap(0, Ordering::Relaxed) as i64, - i64 - ), - ( - "cost_tracker_check_elapsed", - self.cost_tracker_check_elapsed.swap(0, Ordering::Relaxed) as i64, - i64 - ), ); } } @@ -457,7 +428,6 @@ impl BankingStage { my_pubkey, *next_leader, banking_stage_stats, - cost_model, ); Self::update_buffered_packets_with_new_unprocessed( original_unprocessed_indexes, @@ -960,13 +930,32 @@ impl BankingStage { chunk_offset: usize, transaction_status_sender: Option, gossip_vote_sender: &ReplayVoteSender, + cost_model: &Arc>, ) -> (Result, Vec) { - let mut lock_time = Measure::start("lock_time"); + let mut qos_service_stats = QosServiceStats::default(); + let qos_service = QosService::new(cost_model.clone()); + let tx_costs = qos_service.compute_transaction_costs( + txs.iter(), + bank.demote_program_write_locks(), + &mut qos_service_stats, + ); + + let transactions_qos_results = qos_service.select_transactions_per_cost( + txs.iter(), + tx_costs.iter(), + bank, + &mut qos_service_stats, + ); + + // Only lock accounts for those transactions are selected for the block; // Once accounts are locked, other threads cannot encode transactions that will modify the // same account state - let batch = bank.prepare_sanitized_batch(txs); + let mut lock_time = Measure::start("lock_time"); + let batch = bank.prepare_sanitized_batch_with_results(txs, transactions_qos_results.iter()); lock_time.stop(); + // retryable_txs includes AccountInUse, WouldExceedMaxBlockCostLimit and + // WouldExceedMaxAccountCostLimit let (result, mut retryable_txs) = Self::process_and_record_transactions_locked( bank, poh, @@ -989,6 +978,8 @@ impl BankingStage { txs.len(), ); + qos_service_stats.report(); + (result, retryable_txs) } @@ -1003,6 +994,7 @@ impl BankingStage { poh: &TransactionRecorder, transaction_status_sender: Option, gossip_vote_sender: &ReplayVoteSender, + cost_model: &Arc>, ) -> (usize, Vec) { let mut chunk_start = 0; let mut unprocessed_txs = vec![]; @@ -1019,6 +1011,7 @@ impl BankingStage { chunk_start, transaction_status_sender.clone(), gossip_vote_sender, + cost_model, ); trace!("process_transactions result: {:?}", result); @@ -1087,24 +1080,17 @@ impl BankingStage { Some(&packet.data[msg_start..msg_end]) } - // This function deserializes packets into transactions, computes the blake3 hash of transaction messages, - // and verifies secp256k1 instructions. A list of valid transactions are returned with their message hashes - // and packet indexes. - // Also returned is packet indexes for transaction should be retried due to cost limits. + // This function deserializes packets into transactions, computes the blake3 hash of transaction + // messages, and verifies secp256k1 instructions. A list of sanitized transactions are returned + // with their packet indexes. #[allow(clippy::needless_collect)] fn transactions_from_packets( msgs: &Packets, transaction_indexes: &[usize], feature_set: &Arc, - read_cost_tracker: &RwLockReadGuard, - banking_stage_stats: &BankingStageStats, - demote_program_write_locks: bool, votes_only: bool, - cost_model: &Arc>, - ) -> (Vec, Vec, Vec) { - let mut retryable_transaction_packet_indexes: Vec = vec![]; - - let verified_transactions_with_packet_indexes: Vec<_> = transaction_indexes + ) -> (Vec, Vec) { + transaction_indexes .iter() .filter_map(|tx_index| { let p = &msgs.packets[*tx_index]; @@ -1125,51 +1111,7 @@ impl BankingStage { tx.verify_precompiles(feature_set).ok()?; Some((tx, *tx_index)) }) - .collect(); - banking_stage_stats.cost_tracker_check_count.fetch_add( - verified_transactions_with_packet_indexes.len(), - Ordering::Relaxed, - ); - - let mut cost_tracker_check_time = Measure::start("cost_tracker_check_time"); - let (filtered_transactions, filter_transaction_packet_indexes) = { - verified_transactions_with_packet_indexes - .into_iter() - .filter_map(|(tx, tx_index)| { - // excluding vote TX from cost_model, for now - let is_vote = &msgs.packets[tx_index].meta.is_simple_vote_tx; - if !is_vote - && read_cost_tracker - .would_transaction_fit( - &tx, - &cost_model - .read() - .unwrap() - .calculate_cost(&tx, demote_program_write_locks), - ) - .is_err() - { - // put transaction into retry queue if it wouldn't fit - // into current bank - debug!("transaction {:?} would exceed limit", tx); - retryable_transaction_packet_indexes.push(tx_index); - return None; - } - Some((tx, tx_index)) - }) - .unzip() - }; - cost_tracker_check_time.stop(); - - banking_stage_stats - .cost_tracker_check_elapsed - .fetch_add(cost_tracker_check_time.as_us(), Ordering::Relaxed); - - ( - filtered_transactions, - filter_transaction_packet_indexes, - retryable_transaction_packet_indexes, - ) + .unzip() } /// This function filters pending packets that are still valid @@ -1224,30 +1166,15 @@ impl BankingStage { cost_model: &Arc>, ) -> (usize, usize, Vec) { let mut packet_conversion_time = Measure::start("packet_conversion"); - let (transactions, transaction_to_packet_indexes, retryable_packet_indexes) = - Self::transactions_from_packets( - msgs, - &packet_indexes, - &bank.feature_set, - &bank.read_cost_tracker().unwrap(), - banking_stage_stats, - bank.demote_program_write_locks(), - bank.vote_only_bank(), - cost_model, - ); + let (transactions, transaction_to_packet_indexes) = Self::transactions_from_packets( + msgs, + &packet_indexes, + &bank.feature_set, + bank.vote_only_bank(), + ); packet_conversion_time.stop(); inc_new_counter_info!("banking_stage-packet_conversion", 1); - banking_stage_stats - .cost_forced_retry_transactions_count - .fetch_add(retryable_packet_indexes.len(), Ordering::Relaxed); - debug!( - "bank: {} filtered transactions {} cost limited transactions {}", - bank.slot(), - transactions.len(), - retryable_packet_indexes.len() - ); - let tx_len = transactions.len(); let mut process_tx_time = Measure::start("process_tx_time"); @@ -1258,6 +1185,7 @@ impl BankingStage { poh, transaction_status_sender, gossip_vote_sender, + cost_model, ); process_tx_time.stop(); let unprocessed_tx_count = unprocessed_tx_indexes.len(); @@ -1266,23 +1194,8 @@ impl BankingStage { unprocessed_tx_count ); - // applying cost of processed transactions to shared cost_tracker - let mut cost_tracking_time = Measure::start("cost_tracking_time"); - transactions.iter().enumerate().for_each(|(index, tx)| { - if unprocessed_tx_indexes.iter().all(|&i| i != index) { - bank.write_cost_tracker().unwrap().add_transaction_cost( - tx, - &cost_model - .read() - .unwrap() - .calculate_cost(tx, bank.demote_program_write_locks()), - ); - } - }); - cost_tracking_time.stop(); - let mut filter_pending_packets_time = Measure::start("filter_pending_packets_time"); - let mut filtered_unprocessed_packet_indexes = Self::filter_pending_packets_from_pending_txs( + let filtered_unprocessed_packet_indexes = Self::filter_pending_packets_from_pending_txs( bank, &transactions, &transaction_to_packet_indexes, @@ -1295,19 +1208,12 @@ impl BankingStage { unprocessed_tx_count.saturating_sub(filtered_unprocessed_packet_indexes.len()) ); - // combine cost-related unprocessed transactions with bank determined unprocessed for - // buffering - filtered_unprocessed_packet_indexes.extend(retryable_packet_indexes); - banking_stage_stats .packet_conversion_elapsed .fetch_add(packet_conversion_time.as_us(), Ordering::Relaxed); banking_stage_stats .transaction_processing_elapsed .fetch_add(process_tx_time.as_us(), Ordering::Relaxed); - banking_stage_stats - .cost_tracker_update_elapsed - .fetch_add(cost_tracking_time.as_us(), Ordering::Relaxed); banking_stage_stats .filter_pending_packets_elapsed .fetch_add(filter_pending_packets_time.as_us(), Ordering::Relaxed); @@ -1322,7 +1228,6 @@ impl BankingStage { my_pubkey: &Pubkey, next_leader: Option, banking_stage_stats: &BankingStageStats, - cost_model: &Arc>, ) -> Vec { // Check if we are the next leader. If so, let's not filter the packets // as we'll filter it again while processing the packets. @@ -1335,31 +1240,24 @@ impl BankingStage { let mut unprocessed_packet_conversion_time = Measure::start("unprocessed_packet_conversion"); - let (transactions, transaction_to_packet_indexes, retry_packet_indexes) = - Self::transactions_from_packets( - msgs, - transaction_indexes, - &bank.feature_set, - &bank.read_cost_tracker().unwrap(), - banking_stage_stats, - bank.demote_program_write_locks(), - bank.vote_only_bank(), - cost_model, - ); + let (transactions, transaction_to_packet_indexes) = Self::transactions_from_packets( + msgs, + transaction_indexes, + &bank.feature_set, + bank.vote_only_bank(), + ); unprocessed_packet_conversion_time.stop(); let tx_count = transaction_to_packet_indexes.len(); let unprocessed_tx_indexes = (0..transactions.len()).collect_vec(); - let mut filtered_unprocessed_packet_indexes = Self::filter_pending_packets_from_pending_txs( + let filtered_unprocessed_packet_indexes = Self::filter_pending_packets_from_pending_txs( bank, &transactions, &transaction_to_packet_indexes, &unprocessed_tx_indexes, ); - filtered_unprocessed_packet_indexes.extend(retry_packet_indexes); - inc_new_counter_info!( "banking_stage-dropped_tx_before_forwarding", tx_count.saturating_sub(filtered_unprocessed_packet_indexes.len()) @@ -1496,7 +1394,6 @@ impl BankingStage { my_pubkey, next_leader, banking_stage_stats, - cost_model, ); Self::push_unprocessed( buffered_packets, @@ -2342,6 +2239,7 @@ mod tests { 0, None, &gossip_vote_sender, + &Arc::new(RwLock::new(CostModel::default())), ) .0 .unwrap(); @@ -2383,6 +2281,7 @@ mod tests { 0, None, &gossip_vote_sender, + &Arc::new(RwLock::new(CostModel::default())), ) .0, Err(PohRecorderError::MaxHeightReached) @@ -2470,6 +2369,7 @@ mod tests { 0, None, &gossip_vote_sender, + &Arc::new(RwLock::new(CostModel::default())), ); poh_recorder @@ -2578,6 +2478,7 @@ mod tests { &recorder, None, &gossip_vote_sender, + &Arc::new(RwLock::new(CostModel::default())), ); assert_eq!(processed_transactions_count, 0,); @@ -2670,6 +2571,7 @@ mod tests { enable_cpi_and_log_storage: false, }), &gossip_vote_sender, + &Arc::new(RwLock::new(CostModel::default())), ); transaction_status_service.join().unwrap(); @@ -3130,32 +3032,22 @@ mod tests { make_test_packets(vec![transfer_tx.clone(), transfer_tx.clone()], vote_indexes); let mut votes_only = false; - let (txs, tx_packet_index, _retryable_packet_indexes) = - BankingStage::transactions_from_packets( - &packets, - &packet_indexes, - &Arc::new(FeatureSet::default()), - &RwLock::new(CostTracker::default()).read().unwrap(), - &BankingStageStats::default(), - false, - votes_only, - &Arc::new(RwLock::new(CostModel::default())), - ); + let (txs, tx_packet_index) = BankingStage::transactions_from_packets( + &packets, + &packet_indexes, + &Arc::new(FeatureSet::default()), + votes_only, + ); assert_eq!(2, txs.len()); assert_eq!(vec![0, 1], tx_packet_index); votes_only = true; - let (txs, tx_packet_index, _retryable_packet_indexes) = - BankingStage::transactions_from_packets( - &packets, - &packet_indexes, - &Arc::new(FeatureSet::default()), - &RwLock::new(CostTracker::default()).read().unwrap(), - &BankingStageStats::default(), - false, - votes_only, - &Arc::new(RwLock::new(CostModel::default())), - ); + let (txs, tx_packet_index) = BankingStage::transactions_from_packets( + &packets, + &packet_indexes, + &Arc::new(FeatureSet::default()), + votes_only, + ); assert_eq!(0, txs.len()); assert_eq!(0, tx_packet_index.len()); } @@ -3169,32 +3061,22 @@ mod tests { ); let mut votes_only = false; - let (txs, tx_packet_index, _retryable_packet_indexes) = - BankingStage::transactions_from_packets( - &packets, - &packet_indexes, - &Arc::new(FeatureSet::default()), - &RwLock::new(CostTracker::default()).read().unwrap(), - &BankingStageStats::default(), - false, - votes_only, - &Arc::new(RwLock::new(CostModel::default())), - ); + let (txs, tx_packet_index) = BankingStage::transactions_from_packets( + &packets, + &packet_indexes, + &Arc::new(FeatureSet::default()), + votes_only, + ); assert_eq!(3, txs.len()); assert_eq!(vec![0, 1, 2], tx_packet_index); votes_only = true; - let (txs, tx_packet_index, _retryable_packet_indexes) = - BankingStage::transactions_from_packets( - &packets, - &packet_indexes, - &Arc::new(FeatureSet::default()), - &RwLock::new(CostTracker::default()).read().unwrap(), - &BankingStageStats::default(), - false, - votes_only, - &Arc::new(RwLock::new(CostModel::default())), - ); + let (txs, tx_packet_index) = BankingStage::transactions_from_packets( + &packets, + &packet_indexes, + &Arc::new(FeatureSet::default()), + votes_only, + ); assert_eq!(2, txs.len()); assert_eq!(vec![0, 2], tx_packet_index); } @@ -3208,32 +3090,22 @@ mod tests { ); let mut votes_only = false; - let (txs, tx_packet_index, _retryable_packet_indexes) = - BankingStage::transactions_from_packets( - &packets, - &packet_indexes, - &Arc::new(FeatureSet::default()), - &RwLock::new(CostTracker::default()).read().unwrap(), - &BankingStageStats::default(), - false, - votes_only, - &Arc::new(RwLock::new(CostModel::default())), - ); + let (txs, tx_packet_index) = BankingStage::transactions_from_packets( + &packets, + &packet_indexes, + &Arc::new(FeatureSet::default()), + votes_only, + ); assert_eq!(3, txs.len()); assert_eq!(vec![0, 1, 2], tx_packet_index); votes_only = true; - let (txs, tx_packet_index, _retryable_packet_indexes) = - BankingStage::transactions_from_packets( - &packets, - &packet_indexes, - &Arc::new(FeatureSet::default()), - &RwLock::new(CostTracker::default()).read().unwrap(), - &BankingStageStats::default(), - false, - votes_only, - &Arc::new(RwLock::new(CostModel::default())), - ); + let (txs, tx_packet_index) = BankingStage::transactions_from_packets( + &packets, + &packet_indexes, + &Arc::new(FeatureSet::default()), + votes_only, + ); assert_eq!(3, txs.len()); assert_eq!(vec![0, 1, 2], tx_packet_index); } diff --git a/core/src/lib.rs b/core/src/lib.rs index 5d991f6997..45f58e6e21 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -32,6 +32,7 @@ pub mod optimistic_confirmation_verifier; pub mod outstanding_requests; pub mod packet_hasher; pub mod progress_map; +pub mod qos_service; pub mod repair_response; pub mod repair_service; pub mod repair_weight; diff --git a/core/src/qos_service.rs b/core/src/qos_service.rs new file mode 100644 index 0000000000..b5f88db38f --- /dev/null +++ b/core/src/qos_service.rs @@ -0,0 +1,242 @@ +//! Quality of service for block producer. +//! Provides logic and functions to allow a Leader to prioritize +//! how transactions are included in blocks, and optimize those blocks. +//! +use { + solana_measure::measure::Measure, + solana_runtime::{ + bank::Bank, + cost_model::{CostModel, TransactionCost}, + cost_tracker::CostTrackerError, + }, + solana_sdk::transaction::{self, SanitizedTransaction, TransactionError}, + std::sync::{Arc, RwLock}, +}; + +#[derive(Default)] +pub struct QosServiceStats { + compute_cost_time: u64, + cost_tracking_time: u64, + selected_txs_count: u64, + retried_txs_per_block_limit_count: u64, + retried_txs_per_account_limit_count: u64, +} + +impl QosServiceStats { + pub fn report(&mut self) { + datapoint_info!( + "qos-service-stats", + ("compute_cost_time", self.compute_cost_time, i64), + ("cost_tracking_time", self.cost_tracking_time, i64), + ("selected_txs_count", self.selected_txs_count, i64), + ( + "retried_txs_per_block_limit_count", + self.retried_txs_per_block_limit_count, + i64 + ), + ( + "retried_txs_per_account_limit_count", + self.retried_txs_per_account_limit_count, + i64 + ), + ); + } +} + +pub struct QosService { + cost_model: Arc>, +} + +impl QosService { + pub fn new(cost_model: Arc>) -> Self { + Self { cost_model } + } + + pub fn compute_transaction_costs<'a>( + &self, + transactions: impl Iterator, + demote_program_write_locks: bool, + stats: &mut QosServiceStats, + ) -> Vec { + let mut compute_cost_time = Measure::start("compute_cost_time"); + let cost_model = self.cost_model.read().unwrap(); + let txs_costs = transactions + .map(|tx| { + let cost = cost_model.calculate_cost(tx, demote_program_write_locks); + debug!( + "transaction {:?}, cost {:?}, cost sum {}", + tx, + cost, + cost.sum() + ); + cost + }) + .collect(); + compute_cost_time.stop(); + stats.compute_cost_time += compute_cost_time.as_us(); + txs_costs + } + + // Given a list of transactions and their costs, this function returns a corresponding + // list of Results that indicate if a transaction is selected to be included in the current block, + pub fn select_transactions_per_cost<'a>( + &self, + transactions: impl Iterator, + transactions_costs: impl Iterator, + bank: &Arc, + stats: &mut QosServiceStats, + ) -> Vec> { + let mut cost_tracking_time = Measure::start("cost_tracking_time"); + let mut cost_tracker = bank.write_cost_tracker().unwrap(); + let select_results = transactions + .zip(transactions_costs) + .map(|(tx, cost)| match cost_tracker.try_add(tx, cost) { + Ok(current_block_cost) => { + debug!("slot {:?}, transaction {:?}, cost {:?}, fit into current block, current block cost {}", bank.slot(), tx, cost, current_block_cost); + stats.selected_txs_count += 1; + Ok(()) + }, + Err(e) => { + debug!("slot {:?}, transaction {:?}, cost {:?}, not fit into current block, '{:?}'", bank.slot(), tx, cost, e); + match e { + CostTrackerError::WouldExceedBlockMaxLimit => { + stats.retried_txs_per_block_limit_count += 1; + Err(TransactionError::WouldExceedMaxBlockCostLimit) + } + CostTrackerError::WouldExceedAccountMaxLimit => { + stats.retried_txs_per_account_limit_count += 1; + Err(TransactionError::WouldExceedMaxAccountCostLimit) + } + } + } + }) + .collect(); + cost_tracking_time.stop(); + stats.cost_tracking_time += cost_tracking_time.as_us(); + select_results + } +} + +#[cfg(test)] +mod tests { + use { + super::*, + itertools::Itertools, + solana_runtime::{ + bank::Bank, + genesis_utils::{create_genesis_config, GenesisConfigInfo}, + }, + solana_sdk::{ + hash::Hash, + signature::{Keypair, Signer}, + system_transaction, + }, + solana_vote_program::vote_transaction, + }; + + #[test] + fn test_compute_transactions_costs() { + solana_logger::setup(); + + // make a vec of txs + let keypair = Keypair::new(); + let transfer_tx = SanitizedTransaction::from_transaction_for_tests( + system_transaction::transfer(&keypair, &keypair.pubkey(), 1, Hash::default()), + ); + let vote_tx = SanitizedTransaction::from_transaction_for_tests( + vote_transaction::new_vote_transaction( + vec![42], + Hash::default(), + Hash::default(), + &keypair, + &keypair, + &keypair, + None, + ), + ); + let txs = vec![transfer_tx.clone(), vote_tx.clone(), vote_tx, transfer_tx]; + + let cost_model = Arc::new(RwLock::new(CostModel::default())); + let qos_service = QosService::new(cost_model.clone()); + let txs_costs = qos_service.compute_transaction_costs( + txs.iter(), + false, + &mut QosServiceStats::default(), + ); + + // verify the size of txs_costs and its contents + assert_eq!(txs_costs.len(), txs.len()); + txs_costs + .iter() + .enumerate() + .map(|(index, cost)| { + assert_eq!( + cost.sum(), + cost_model + .read() + .unwrap() + .calculate_cost(&txs[index], false) + .sum() + ); + }) + .collect_vec(); + } + + #[test] + fn test_select_transactions_per_cost() { + solana_logger::setup(); + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10); + let bank = Arc::new(Bank::new_for_tests(&genesis_config)); + let cost_model = Arc::new(RwLock::new(CostModel::default())); + + let keypair = Keypair::new(); + let transfer_tx = SanitizedTransaction::from_transaction_for_tests( + system_transaction::transfer(&keypair, &keypair.pubkey(), 1, Hash::default()), + ); + let vote_tx = SanitizedTransaction::from_transaction_for_tests( + vote_transaction::new_vote_transaction( + vec![42], + Hash::default(), + Hash::default(), + &keypair, + &keypair, + &keypair, + None, + ), + ); + let transfer_tx_cost = cost_model + .read() + .unwrap() + .calculate_cost(&transfer_tx, false) + .sum(); + + // make a vec of txs + let txs = vec![transfer_tx.clone(), vote_tx.clone(), transfer_tx, vote_tx]; + + let qos_service = QosService::new(cost_model); + let txs_costs = qos_service.compute_transaction_costs( + txs.iter(), + false, + &mut QosServiceStats::default(), + ); + + // set cost tracker limit to fit 1 transfer tx, vote tx bypasses limit check + let cost_limit = transfer_tx_cost; + bank.write_cost_tracker() + .unwrap() + .set_limits(cost_limit, cost_limit); + let results = qos_service.select_transactions_per_cost( + txs.iter(), + txs_costs.iter(), + &bank, + &mut QosServiceStats::default(), + ); + + // verify that first transfer tx and all votes are allowed + assert_eq!(results.len(), txs.len()); + assert!(results[0].is_ok()); + assert!(results[1].is_ok()); + assert!(results[2].is_err()); + assert!(results[3].is_ok()); + } +} diff --git a/runtime/src/accounts.rs b/runtime/src/accounts.rs index cb6514fdaa..1d5d113870 100644 --- a/runtime/src/accounts.rs +++ b/runtime/src/accounts.rs @@ -932,6 +932,26 @@ impl Accounts { .collect() } + #[allow(clippy::needless_collect)] + pub fn lock_accounts_with_results<'a>( + &self, + txs: impl Iterator, + results: impl Iterator>, + demote_program_write_locks: bool, + ) -> Vec> { + let keys: Vec<_> = txs + .map(|tx| tx.get_account_locks(demote_program_write_locks)) + .collect(); + let account_locks = &mut self.account_locks.lock().unwrap(); + keys.into_iter() + .zip(results) + .map(|(keys, result)| match result { + Ok(()) => self.lock_account(account_locks, keys.writable, keys.readonly), + Err(e) => Err(e.clone()), + }) + .collect() + } + /// Once accounts are unlocked, new transactions that modify that state can enter the pipeline #[allow(clippy::needless_collect)] pub fn unlock_accounts<'a>( diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index b788bdaf6e..e3e494d4c6 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -230,7 +230,7 @@ impl ExecuteTimings { } type BankStatusCache = StatusCache>; -#[frozen_abi(digest = "5Br3PNyyX1L7XoS4jYLt5JTeMXowLSsu7v9LhokC8vnq")] +#[frozen_abi(digest = "7bCDimGo11ajw6ZHViBBu8KPfoDZBcwSnumWCU8MMuwr")] pub type BankSlotDelta = SlotDelta>; type TransactionAccountRefCells = Vec<(Pubkey, Rc>)>; @@ -3357,6 +3357,22 @@ impl Bank { TransactionBatch::new(lock_results, self, Cow::Borrowed(txs)) } + /// Prepare a locked transaction batch from a list of sanitized transactions, and their cost + /// limited packing status + pub fn prepare_sanitized_batch_with_results<'a, 'b>( + &'a self, + transactions: &'b [SanitizedTransaction], + transaction_results: impl Iterator>, + ) -> TransactionBatch<'a, 'b> { + // this lock_results could be: Ok, AccountInUse, WouldExceedBlockMaxLimit or WouldExceedAccountMaxLimit + let lock_results = self.rc.accounts.lock_accounts_with_results( + transactions.iter(), + transaction_results, + self.demote_program_write_locks(), + ); + TransactionBatch::new(lock_results, self, Cow::Borrowed(transactions)) + } + /// Prepare a transaction batch without locking accounts for transaction simulation. pub(crate) fn prepare_simulation_batch<'a>( &'a self, @@ -3772,6 +3788,8 @@ impl Bank { error_counters.account_in_use += 1; Some(index) } + Err(TransactionError::WouldExceedMaxBlockCostLimit) + | Err(TransactionError::WouldExceedMaxAccountCostLimit) => Some(index), Err(_) => None, Ok(_) => None, }) @@ -6413,7 +6431,7 @@ pub fn goto_end_of_slot(bank: &mut Bank) { } } -fn is_simple_vote_transaction(transaction: &SanitizedTransaction) -> bool { +pub fn is_simple_vote_transaction(transaction: &SanitizedTransaction) -> bool { if transaction.message().instructions().len() == 1 { let (program_pubkey, instruction) = transaction .message() diff --git a/runtime/src/cost_model.rs b/runtime/src/cost_model.rs index 20670d24bb..f6016ec99a 100644 --- a/runtime/src/cost_model.rs +++ b/runtime/src/cost_model.rs @@ -4,7 +4,9 @@ //! //! The main function is `calculate_cost` which returns &TransactionCost. //! -use crate::{block_cost_limits::*, execute_cost_table::ExecuteCostTable}; +use crate::{ + bank::is_simple_vote_transaction, block_cost_limits::*, execute_cost_table::ExecuteCostTable, +}; use log::*; use solana_sdk::{pubkey::Pubkey, transaction::SanitizedTransaction}; use std::collections::HashMap; @@ -12,13 +14,31 @@ use std::collections::HashMap; const MAX_WRITABLE_ACCOUNTS: usize = 256; // costs are stored in number of 'compute unit's -#[derive(AbiExample, Default, Debug)] +#[derive(AbiExample, Debug)] pub struct TransactionCost { pub writable_accounts: Vec, pub signature_cost: u64, pub write_lock_cost: u64, pub data_bytes_cost: u64, pub execution_cost: u64, + // `cost_weight` is a multiplier to be applied to tx cost, that + // allows to increase/decrease tx cost linearly based on algo. + // for example, vote tx could have weight zero to bypass cost + // limit checking during block packing. + pub cost_weight: u32, +} + +impl Default for TransactionCost { + fn default() -> Self { + Self { + writable_accounts: Vec::with_capacity(MAX_WRITABLE_ACCOUNTS), + signature_cost: 0u64, + write_lock_cost: 0u64, + data_bytes_cost: 0u64, + execution_cost: 0u64, + cost_weight: 1u32, + } + } } impl TransactionCost { @@ -35,6 +55,7 @@ impl TransactionCost { self.write_lock_cost = 0; self.data_bytes_cost = 0; self.execution_cost = 0; + self.cost_weight = 1; } pub fn sum(&self) -> u64 { @@ -95,6 +116,7 @@ impl CostModel { self.get_write_lock_cost(&mut tx_cost, transaction, demote_program_write_locks); tx_cost.data_bytes_cost = self.get_data_bytes_cost(transaction); tx_cost.execution_cost = self.get_transaction_cost(transaction); + tx_cost.cost_weight = self.calculate_cost_weight(transaction); debug!("transaction {:?} has cost {:?}", transaction, tx_cost); tx_cost @@ -177,6 +199,15 @@ impl CostModel { } } } + + fn calculate_cost_weight(&self, transaction: &SanitizedTransaction) -> u32 { + if is_simple_vote_transaction(transaction) { + // vote has zero cost weight, so it bypasses block cost limit checking + 0u32 + } else { + 1u32 + } + } } #[cfg(test)] @@ -196,6 +227,7 @@ mod tests { system_program, system_transaction, transaction::Transaction, }; + use solana_vote_program::vote_transaction; use std::{ str::FromStr, sync::{Arc, RwLock}, @@ -394,6 +426,7 @@ mod tests { assert_eq!(expected_account_cost, tx_cost.write_lock_cost); assert_eq!(expected_execution_cost, tx_cost.execution_cost); assert_eq!(2, tx_cost.writable_accounts.len()); + assert_eq!(1u32, tx_cost.cost_weight); } #[test] @@ -500,4 +533,31 @@ mod tests { .get_cost(&solana_vote_program::id()) .is_some()); } + + #[test] + fn test_calculate_cost_weight() { + let (mint_keypair, start_hash) = test_setup(); + + let keypair = Keypair::new(); + let simple_transaction = SanitizedTransaction::from_transaction_for_tests( + system_transaction::transfer(&mint_keypair, &keypair.pubkey(), 2, start_hash), + ); + let vote_transaction = SanitizedTransaction::from_transaction_for_tests( + vote_transaction::new_vote_transaction( + vec![42], + Hash::default(), + Hash::default(), + &keypair, + &keypair, + &keypair, + None, + ), + ); + + let testee = CostModel::default(); + + // For now, vote has zero weight, everything else is neutral, for now + assert_eq!(1u32, testee.calculate_cost_weight(&simple_transaction)); + assert_eq!(0u32, testee.calculate_cost_weight(&vote_transaction)); + } } diff --git a/runtime/src/cost_tracker.rs b/runtime/src/cost_tracker.rs index e2f1390b96..1a2765c3a6 100644 --- a/runtime/src/cost_tracker.rs +++ b/runtime/src/cost_tracker.rs @@ -72,7 +72,7 @@ impl CostTracker { _transaction: &SanitizedTransaction, tx_cost: &TransactionCost, ) -> Result { - let cost = tx_cost.sum(); + let cost = tx_cost.sum() * tx_cost.cost_weight as u64; self.would_fit(&tx_cost.writable_accounts, &cost)?; self.add_transaction(&tx_cost.writable_accounts, &cost); Ok(self.block_cost) @@ -369,4 +369,26 @@ mod tests { assert_eq!(acct2, costliest_account); } } + + #[test] + fn test_try_add_with_cost_weight() { + let (mint_keypair, start_hash) = test_setup(); + let (tx, _keys, _cost) = build_simple_transaction(&mint_keypair, &start_hash); + let tx = SanitizedTransaction::from_transaction_for_tests(tx); + + let limit = 100u64; + let mut testee = CostTracker::new(limit, limit); + + let mut cost = TransactionCost { + execution_cost: limit + 1, + ..TransactionCost::default() + }; + + // cost exceed limit by 1, will not fit + assert!(testee.try_add(&tx, &cost).is_err()); + + cost.cost_weight = 0u32; + // setting cost_weight to zero will allow this tx + assert!(testee.try_add(&tx, &cost).is_ok()); + } } diff --git a/sdk/src/transaction/mod.rs b/sdk/src/transaction/mod.rs index 54ee2d21d1..117179f351 100644 --- a/sdk/src/transaction/mod.rs +++ b/sdk/src/transaction/mod.rs @@ -110,9 +110,8 @@ pub enum TransactionError { #[error("Transaction processing left an account with an outstanding borrowed reference")] AccountBorrowOutstanding, - #[error( - "Transaction could not fit into current block without exceeding the Max Block Cost Limit" - )] + /// Transaction would exceed max Block Cost Limit + #[error("Transaction would exceed max Block Cost Limit")] WouldExceedMaxBlockCostLimit, /// Transaction version is unsupported @@ -122,6 +121,10 @@ pub enum TransactionError { /// Transaction loads a writable account that cannot be written #[error("Transaction loads a writable account that cannot be written")] InvalidWritableAccount, + + /// Transaction would exceed max account limit within the block + #[error("Transaction would exceed max account limit within the block")] + WouldExceedMaxAccountCostLimit, } pub type Result = result::Result; diff --git a/storage-proto/proto/transaction_by_addr.proto b/storage-proto/proto/transaction_by_addr.proto index 0278d81d7c..36c17832ee 100644 --- a/storage-proto/proto/transaction_by_addr.proto +++ b/storage-proto/proto/transaction_by_addr.proto @@ -44,6 +44,7 @@ enum TransactionErrorType { WOULD_EXCEED_MAX_BLOCK_COST_LIMIT = 17; UNSUPPORTED_VERSION = 18; INVALID_WRITABLE_ACCOUNT = 19; + WOULD_EXCEED_MAX_ACCOUNT_COST_LIMIT = 20; } message InstructionError { diff --git a/storage-proto/src/convert.rs b/storage-proto/src/convert.rs index c35fac5ed5..408877d661 100644 --- a/storage-proto/src/convert.rs +++ b/storage-proto/src/convert.rs @@ -553,6 +553,7 @@ impl TryFrom for TransactionError { 17 => TransactionError::WouldExceedMaxBlockCostLimit, 18 => TransactionError::UnsupportedVersion, 19 => TransactionError::InvalidWritableAccount, + 20 => TransactionError::WouldExceedMaxAccountCostLimit, _ => return Err("Invalid TransactionError"), }) } @@ -620,6 +621,9 @@ impl From for tx_by_addr::TransactionError { TransactionError::InvalidWritableAccount => { tx_by_addr::TransactionErrorType::InvalidWritableAccount } + TransactionError::WouldExceedMaxAccountCostLimit => { + tx_by_addr::TransactionErrorType::WouldExceedMaxAccountCostLimit + } } as i32, instruction_error: match transaction_error { TransactionError::InstructionError(index, ref instruction_error) => { @@ -1031,6 +1035,14 @@ mod test { tx_by_addr_transaction_error.try_into().unwrap() ); + let transaction_error = TransactionError::WouldExceedMaxAccountCostLimit; + let tx_by_addr_transaction_error: tx_by_addr::TransactionError = + transaction_error.clone().into(); + assert_eq!( + transaction_error, + tx_by_addr_transaction_error.try_into().unwrap() + ); + let transaction_error = TransactionError::UnsupportedVersion; let tx_by_addr_transaction_error: tx_by_addr::TransactionError = transaction_error.clone().into();