refactor cost calculation (#21062)

* - cache calculated transaction cost to allow sharing;
- atomic cost tracking op;
- only lock accounts for transactions eligible for current block;
- moved qos service and stats reporting to its own model;
- add cost_weight default to neutral (as 1), vote has zero weight;

Co-authored-by: Tyera Eulberg <teulberg@gmail.com>

* Update core/src/qos_service.rs

Co-authored-by: Tyera Eulberg <teulberg@gmail.com>

* Update core/src/qos_service.rs

Co-authored-by: Tyera Eulberg <teulberg@gmail.com>

Co-authored-by: Tyera Eulberg <teulberg@gmail.com>
This commit is contained in:
Tao Zhu 2021-11-12 01:04:53 -06:00 committed by GitHub
parent ef29d2d172
commit 11153e1f87
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 479 additions and 228 deletions

View File

@ -1,7 +1,10 @@
//! The `banking_stage` processes Transaction messages. It is intended to be used //! The `banking_stage` processes Transaction messages. It is intended to be used
//! to contruct a software pipeline. The stage uses all available CPU cores and //! to contruct a software pipeline. The stage uses all available CPU cores and
//! can do its processing in parallel with signature verification on the GPU. //! can do its processing in parallel with signature verification on the GPU.
use crate::packet_hasher::PacketHasher; use crate::{
packet_hasher::PacketHasher,
qos_service::{QosService, QosServiceStats},
};
use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError}; use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError};
use itertools::Itertools; use itertools::Itertools;
use lru::LruCache; use lru::LruCache;
@ -26,7 +29,6 @@ use solana_runtime::{
}, },
bank_utils, bank_utils,
cost_model::CostModel, cost_model::CostModel,
cost_tracker::CostTracker,
transaction_batch::TransactionBatch, transaction_batch::TransactionBatch,
vote_sender_types::ReplayVoteSender, vote_sender_types::ReplayVoteSender,
}; };
@ -55,7 +57,7 @@ use std::{
net::{SocketAddr, UdpSocket}, net::{SocketAddr, UdpSocket},
ops::DerefMut, ops::DerefMut,
sync::atomic::{AtomicU64, AtomicUsize, Ordering}, sync::atomic::{AtomicU64, AtomicUsize, Ordering},
sync::{Arc, Mutex, RwLock, RwLockReadGuard}, sync::{Arc, Mutex, RwLock},
thread::{self, Builder, JoinHandle}, thread::{self, Builder, JoinHandle},
time::Duration, time::Duration,
time::Instant, time::Instant,
@ -97,8 +99,6 @@ pub struct BankingStageStats {
current_buffered_packet_batches_count: AtomicUsize, current_buffered_packet_batches_count: AtomicUsize,
rebuffered_packets_count: AtomicUsize, rebuffered_packets_count: AtomicUsize,
consumed_buffered_packets_count: AtomicUsize, consumed_buffered_packets_count: AtomicUsize,
cost_tracker_check_count: AtomicUsize,
cost_forced_retry_transactions_count: AtomicUsize,
// Timing // Timing
consume_buffered_packets_elapsed: AtomicU64, consume_buffered_packets_elapsed: AtomicU64,
@ -109,9 +109,6 @@ pub struct BankingStageStats {
packet_conversion_elapsed: AtomicU64, packet_conversion_elapsed: AtomicU64,
unprocessed_packet_conversion_elapsed: AtomicU64, unprocessed_packet_conversion_elapsed: AtomicU64,
transaction_processing_elapsed: AtomicU64, transaction_processing_elapsed: AtomicU64,
cost_tracker_update_elapsed: AtomicU64,
cost_tracker_clone_elapsed: AtomicU64,
cost_tracker_check_elapsed: AtomicU64,
} }
impl BankingStageStats { impl BankingStageStats {
@ -181,17 +178,6 @@ impl BankingStageStats {
.swap(0, Ordering::Relaxed) as i64, .swap(0, Ordering::Relaxed) as i64,
i64 i64
), ),
(
"cost_tracker_check_count",
self.cost_tracker_check_count.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"cost_forced_retry_transactions_count",
self.cost_forced_retry_transactions_count
.swap(0, Ordering::Relaxed) as i64,
i64
),
( (
"consume_buffered_packets_elapsed", "consume_buffered_packets_elapsed",
self.consume_buffered_packets_elapsed self.consume_buffered_packets_elapsed
@ -238,21 +224,6 @@ impl BankingStageStats {
.swap(0, Ordering::Relaxed) as i64, .swap(0, Ordering::Relaxed) as i64,
i64 i64
), ),
(
"cost_tracker_update_elapsed",
self.cost_tracker_update_elapsed.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"cost_tracker_clone_elapsed",
self.cost_tracker_clone_elapsed.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"cost_tracker_check_elapsed",
self.cost_tracker_check_elapsed.swap(0, Ordering::Relaxed) as i64,
i64
),
); );
} }
} }
@ -457,7 +428,6 @@ impl BankingStage {
my_pubkey, my_pubkey,
*next_leader, *next_leader,
banking_stage_stats, banking_stage_stats,
cost_model,
); );
Self::update_buffered_packets_with_new_unprocessed( Self::update_buffered_packets_with_new_unprocessed(
original_unprocessed_indexes, original_unprocessed_indexes,
@ -960,13 +930,32 @@ impl BankingStage {
chunk_offset: usize, chunk_offset: usize,
transaction_status_sender: Option<TransactionStatusSender>, transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender, gossip_vote_sender: &ReplayVoteSender,
cost_model: &Arc<RwLock<CostModel>>,
) -> (Result<usize, PohRecorderError>, Vec<usize>) { ) -> (Result<usize, PohRecorderError>, Vec<usize>) {
let mut lock_time = Measure::start("lock_time"); let mut qos_service_stats = QosServiceStats::default();
let qos_service = QosService::new(cost_model.clone());
let tx_costs = qos_service.compute_transaction_costs(
txs.iter(),
bank.demote_program_write_locks(),
&mut qos_service_stats,
);
let transactions_qos_results = qos_service.select_transactions_per_cost(
txs.iter(),
tx_costs.iter(),
bank,
&mut qos_service_stats,
);
// Only lock accounts for those transactions are selected for the block;
// Once accounts are locked, other threads cannot encode transactions that will modify the // Once accounts are locked, other threads cannot encode transactions that will modify the
// same account state // same account state
let batch = bank.prepare_sanitized_batch(txs); let mut lock_time = Measure::start("lock_time");
let batch = bank.prepare_sanitized_batch_with_results(txs, transactions_qos_results.iter());
lock_time.stop(); lock_time.stop();
// retryable_txs includes AccountInUse, WouldExceedMaxBlockCostLimit and
// WouldExceedMaxAccountCostLimit
let (result, mut retryable_txs) = Self::process_and_record_transactions_locked( let (result, mut retryable_txs) = Self::process_and_record_transactions_locked(
bank, bank,
poh, poh,
@ -989,6 +978,8 @@ impl BankingStage {
txs.len(), txs.len(),
); );
qos_service_stats.report();
(result, retryable_txs) (result, retryable_txs)
} }
@ -1003,6 +994,7 @@ impl BankingStage {
poh: &TransactionRecorder, poh: &TransactionRecorder,
transaction_status_sender: Option<TransactionStatusSender>, transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender, gossip_vote_sender: &ReplayVoteSender,
cost_model: &Arc<RwLock<CostModel>>,
) -> (usize, Vec<usize>) { ) -> (usize, Vec<usize>) {
let mut chunk_start = 0; let mut chunk_start = 0;
let mut unprocessed_txs = vec![]; let mut unprocessed_txs = vec![];
@ -1019,6 +1011,7 @@ impl BankingStage {
chunk_start, chunk_start,
transaction_status_sender.clone(), transaction_status_sender.clone(),
gossip_vote_sender, gossip_vote_sender,
cost_model,
); );
trace!("process_transactions result: {:?}", result); trace!("process_transactions result: {:?}", result);
@ -1087,24 +1080,17 @@ impl BankingStage {
Some(&packet.data[msg_start..msg_end]) Some(&packet.data[msg_start..msg_end])
} }
// This function deserializes packets into transactions, computes the blake3 hash of transaction messages, // This function deserializes packets into transactions, computes the blake3 hash of transaction
// and verifies secp256k1 instructions. A list of valid transactions are returned with their message hashes // messages, and verifies secp256k1 instructions. A list of sanitized transactions are returned
// and packet indexes. // with their packet indexes.
// Also returned is packet indexes for transaction should be retried due to cost limits.
#[allow(clippy::needless_collect)] #[allow(clippy::needless_collect)]
fn transactions_from_packets( fn transactions_from_packets(
msgs: &Packets, msgs: &Packets,
transaction_indexes: &[usize], transaction_indexes: &[usize],
feature_set: &Arc<feature_set::FeatureSet>, feature_set: &Arc<feature_set::FeatureSet>,
read_cost_tracker: &RwLockReadGuard<CostTracker>,
banking_stage_stats: &BankingStageStats,
demote_program_write_locks: bool,
votes_only: bool, votes_only: bool,
cost_model: &Arc<RwLock<CostModel>>, ) -> (Vec<SanitizedTransaction>, Vec<usize>) {
) -> (Vec<SanitizedTransaction>, Vec<usize>, Vec<usize>) { transaction_indexes
let mut retryable_transaction_packet_indexes: Vec<usize> = vec![];
let verified_transactions_with_packet_indexes: Vec<_> = transaction_indexes
.iter() .iter()
.filter_map(|tx_index| { .filter_map(|tx_index| {
let p = &msgs.packets[*tx_index]; let p = &msgs.packets[*tx_index];
@ -1125,51 +1111,7 @@ impl BankingStage {
tx.verify_precompiles(feature_set).ok()?; tx.verify_precompiles(feature_set).ok()?;
Some((tx, *tx_index)) Some((tx, *tx_index))
}) })
.collect(); .unzip()
banking_stage_stats.cost_tracker_check_count.fetch_add(
verified_transactions_with_packet_indexes.len(),
Ordering::Relaxed,
);
let mut cost_tracker_check_time = Measure::start("cost_tracker_check_time");
let (filtered_transactions, filter_transaction_packet_indexes) = {
verified_transactions_with_packet_indexes
.into_iter()
.filter_map(|(tx, tx_index)| {
// excluding vote TX from cost_model, for now
let is_vote = &msgs.packets[tx_index].meta.is_simple_vote_tx;
if !is_vote
&& read_cost_tracker
.would_transaction_fit(
&tx,
&cost_model
.read()
.unwrap()
.calculate_cost(&tx, demote_program_write_locks),
)
.is_err()
{
// put transaction into retry queue if it wouldn't fit
// into current bank
debug!("transaction {:?} would exceed limit", tx);
retryable_transaction_packet_indexes.push(tx_index);
return None;
}
Some((tx, tx_index))
})
.unzip()
};
cost_tracker_check_time.stop();
banking_stage_stats
.cost_tracker_check_elapsed
.fetch_add(cost_tracker_check_time.as_us(), Ordering::Relaxed);
(
filtered_transactions,
filter_transaction_packet_indexes,
retryable_transaction_packet_indexes,
)
} }
/// This function filters pending packets that are still valid /// This function filters pending packets that are still valid
@ -1224,30 +1166,15 @@ impl BankingStage {
cost_model: &Arc<RwLock<CostModel>>, cost_model: &Arc<RwLock<CostModel>>,
) -> (usize, usize, Vec<usize>) { ) -> (usize, usize, Vec<usize>) {
let mut packet_conversion_time = Measure::start("packet_conversion"); let mut packet_conversion_time = Measure::start("packet_conversion");
let (transactions, transaction_to_packet_indexes, retryable_packet_indexes) = let (transactions, transaction_to_packet_indexes) = Self::transactions_from_packets(
Self::transactions_from_packets( msgs,
msgs, &packet_indexes,
&packet_indexes, &bank.feature_set,
&bank.feature_set, bank.vote_only_bank(),
&bank.read_cost_tracker().unwrap(), );
banking_stage_stats,
bank.demote_program_write_locks(),
bank.vote_only_bank(),
cost_model,
);
packet_conversion_time.stop(); packet_conversion_time.stop();
inc_new_counter_info!("banking_stage-packet_conversion", 1); inc_new_counter_info!("banking_stage-packet_conversion", 1);
banking_stage_stats
.cost_forced_retry_transactions_count
.fetch_add(retryable_packet_indexes.len(), Ordering::Relaxed);
debug!(
"bank: {} filtered transactions {} cost limited transactions {}",
bank.slot(),
transactions.len(),
retryable_packet_indexes.len()
);
let tx_len = transactions.len(); let tx_len = transactions.len();
let mut process_tx_time = Measure::start("process_tx_time"); let mut process_tx_time = Measure::start("process_tx_time");
@ -1258,6 +1185,7 @@ impl BankingStage {
poh, poh,
transaction_status_sender, transaction_status_sender,
gossip_vote_sender, gossip_vote_sender,
cost_model,
); );
process_tx_time.stop(); process_tx_time.stop();
let unprocessed_tx_count = unprocessed_tx_indexes.len(); let unprocessed_tx_count = unprocessed_tx_indexes.len();
@ -1266,23 +1194,8 @@ impl BankingStage {
unprocessed_tx_count unprocessed_tx_count
); );
// applying cost of processed transactions to shared cost_tracker
let mut cost_tracking_time = Measure::start("cost_tracking_time");
transactions.iter().enumerate().for_each(|(index, tx)| {
if unprocessed_tx_indexes.iter().all(|&i| i != index) {
bank.write_cost_tracker().unwrap().add_transaction_cost(
tx,
&cost_model
.read()
.unwrap()
.calculate_cost(tx, bank.demote_program_write_locks()),
);
}
});
cost_tracking_time.stop();
let mut filter_pending_packets_time = Measure::start("filter_pending_packets_time"); let mut filter_pending_packets_time = Measure::start("filter_pending_packets_time");
let mut filtered_unprocessed_packet_indexes = Self::filter_pending_packets_from_pending_txs( let filtered_unprocessed_packet_indexes = Self::filter_pending_packets_from_pending_txs(
bank, bank,
&transactions, &transactions,
&transaction_to_packet_indexes, &transaction_to_packet_indexes,
@ -1295,19 +1208,12 @@ impl BankingStage {
unprocessed_tx_count.saturating_sub(filtered_unprocessed_packet_indexes.len()) unprocessed_tx_count.saturating_sub(filtered_unprocessed_packet_indexes.len())
); );
// combine cost-related unprocessed transactions with bank determined unprocessed for
// buffering
filtered_unprocessed_packet_indexes.extend(retryable_packet_indexes);
banking_stage_stats banking_stage_stats
.packet_conversion_elapsed .packet_conversion_elapsed
.fetch_add(packet_conversion_time.as_us(), Ordering::Relaxed); .fetch_add(packet_conversion_time.as_us(), Ordering::Relaxed);
banking_stage_stats banking_stage_stats
.transaction_processing_elapsed .transaction_processing_elapsed
.fetch_add(process_tx_time.as_us(), Ordering::Relaxed); .fetch_add(process_tx_time.as_us(), Ordering::Relaxed);
banking_stage_stats
.cost_tracker_update_elapsed
.fetch_add(cost_tracking_time.as_us(), Ordering::Relaxed);
banking_stage_stats banking_stage_stats
.filter_pending_packets_elapsed .filter_pending_packets_elapsed
.fetch_add(filter_pending_packets_time.as_us(), Ordering::Relaxed); .fetch_add(filter_pending_packets_time.as_us(), Ordering::Relaxed);
@ -1322,7 +1228,6 @@ impl BankingStage {
my_pubkey: &Pubkey, my_pubkey: &Pubkey,
next_leader: Option<Pubkey>, next_leader: Option<Pubkey>,
banking_stage_stats: &BankingStageStats, banking_stage_stats: &BankingStageStats,
cost_model: &Arc<RwLock<CostModel>>,
) -> Vec<usize> { ) -> Vec<usize> {
// Check if we are the next leader. If so, let's not filter the packets // Check if we are the next leader. If so, let's not filter the packets
// as we'll filter it again while processing the packets. // as we'll filter it again while processing the packets.
@ -1335,31 +1240,24 @@ impl BankingStage {
let mut unprocessed_packet_conversion_time = let mut unprocessed_packet_conversion_time =
Measure::start("unprocessed_packet_conversion"); Measure::start("unprocessed_packet_conversion");
let (transactions, transaction_to_packet_indexes, retry_packet_indexes) = let (transactions, transaction_to_packet_indexes) = Self::transactions_from_packets(
Self::transactions_from_packets( msgs,
msgs, transaction_indexes,
transaction_indexes, &bank.feature_set,
&bank.feature_set, bank.vote_only_bank(),
&bank.read_cost_tracker().unwrap(), );
banking_stage_stats,
bank.demote_program_write_locks(),
bank.vote_only_bank(),
cost_model,
);
unprocessed_packet_conversion_time.stop(); unprocessed_packet_conversion_time.stop();
let tx_count = transaction_to_packet_indexes.len(); let tx_count = transaction_to_packet_indexes.len();
let unprocessed_tx_indexes = (0..transactions.len()).collect_vec(); let unprocessed_tx_indexes = (0..transactions.len()).collect_vec();
let mut filtered_unprocessed_packet_indexes = Self::filter_pending_packets_from_pending_txs( let filtered_unprocessed_packet_indexes = Self::filter_pending_packets_from_pending_txs(
bank, bank,
&transactions, &transactions,
&transaction_to_packet_indexes, &transaction_to_packet_indexes,
&unprocessed_tx_indexes, &unprocessed_tx_indexes,
); );
filtered_unprocessed_packet_indexes.extend(retry_packet_indexes);
inc_new_counter_info!( inc_new_counter_info!(
"banking_stage-dropped_tx_before_forwarding", "banking_stage-dropped_tx_before_forwarding",
tx_count.saturating_sub(filtered_unprocessed_packet_indexes.len()) tx_count.saturating_sub(filtered_unprocessed_packet_indexes.len())
@ -1496,7 +1394,6 @@ impl BankingStage {
my_pubkey, my_pubkey,
next_leader, next_leader,
banking_stage_stats, banking_stage_stats,
cost_model,
); );
Self::push_unprocessed( Self::push_unprocessed(
buffered_packets, buffered_packets,
@ -2342,6 +2239,7 @@ mod tests {
0, 0,
None, None,
&gossip_vote_sender, &gossip_vote_sender,
&Arc::new(RwLock::new(CostModel::default())),
) )
.0 .0
.unwrap(); .unwrap();
@ -2383,6 +2281,7 @@ mod tests {
0, 0,
None, None,
&gossip_vote_sender, &gossip_vote_sender,
&Arc::new(RwLock::new(CostModel::default())),
) )
.0, .0,
Err(PohRecorderError::MaxHeightReached) Err(PohRecorderError::MaxHeightReached)
@ -2470,6 +2369,7 @@ mod tests {
0, 0,
None, None,
&gossip_vote_sender, &gossip_vote_sender,
&Arc::new(RwLock::new(CostModel::default())),
); );
poh_recorder poh_recorder
@ -2578,6 +2478,7 @@ mod tests {
&recorder, &recorder,
None, None,
&gossip_vote_sender, &gossip_vote_sender,
&Arc::new(RwLock::new(CostModel::default())),
); );
assert_eq!(processed_transactions_count, 0,); assert_eq!(processed_transactions_count, 0,);
@ -2670,6 +2571,7 @@ mod tests {
enable_cpi_and_log_storage: false, enable_cpi_and_log_storage: false,
}), }),
&gossip_vote_sender, &gossip_vote_sender,
&Arc::new(RwLock::new(CostModel::default())),
); );
transaction_status_service.join().unwrap(); transaction_status_service.join().unwrap();
@ -3130,32 +3032,22 @@ mod tests {
make_test_packets(vec![transfer_tx.clone(), transfer_tx.clone()], vote_indexes); make_test_packets(vec![transfer_tx.clone(), transfer_tx.clone()], vote_indexes);
let mut votes_only = false; let mut votes_only = false;
let (txs, tx_packet_index, _retryable_packet_indexes) = let (txs, tx_packet_index) = BankingStage::transactions_from_packets(
BankingStage::transactions_from_packets( &packets,
&packets, &packet_indexes,
&packet_indexes, &Arc::new(FeatureSet::default()),
&Arc::new(FeatureSet::default()), votes_only,
&RwLock::new(CostTracker::default()).read().unwrap(), );
&BankingStageStats::default(),
false,
votes_only,
&Arc::new(RwLock::new(CostModel::default())),
);
assert_eq!(2, txs.len()); assert_eq!(2, txs.len());
assert_eq!(vec![0, 1], tx_packet_index); assert_eq!(vec![0, 1], tx_packet_index);
votes_only = true; votes_only = true;
let (txs, tx_packet_index, _retryable_packet_indexes) = let (txs, tx_packet_index) = BankingStage::transactions_from_packets(
BankingStage::transactions_from_packets( &packets,
&packets, &packet_indexes,
&packet_indexes, &Arc::new(FeatureSet::default()),
&Arc::new(FeatureSet::default()), votes_only,
&RwLock::new(CostTracker::default()).read().unwrap(), );
&BankingStageStats::default(),
false,
votes_only,
&Arc::new(RwLock::new(CostModel::default())),
);
assert_eq!(0, txs.len()); assert_eq!(0, txs.len());
assert_eq!(0, tx_packet_index.len()); assert_eq!(0, tx_packet_index.len());
} }
@ -3169,32 +3061,22 @@ mod tests {
); );
let mut votes_only = false; let mut votes_only = false;
let (txs, tx_packet_index, _retryable_packet_indexes) = let (txs, tx_packet_index) = BankingStage::transactions_from_packets(
BankingStage::transactions_from_packets( &packets,
&packets, &packet_indexes,
&packet_indexes, &Arc::new(FeatureSet::default()),
&Arc::new(FeatureSet::default()), votes_only,
&RwLock::new(CostTracker::default()).read().unwrap(), );
&BankingStageStats::default(),
false,
votes_only,
&Arc::new(RwLock::new(CostModel::default())),
);
assert_eq!(3, txs.len()); assert_eq!(3, txs.len());
assert_eq!(vec![0, 1, 2], tx_packet_index); assert_eq!(vec![0, 1, 2], tx_packet_index);
votes_only = true; votes_only = true;
let (txs, tx_packet_index, _retryable_packet_indexes) = let (txs, tx_packet_index) = BankingStage::transactions_from_packets(
BankingStage::transactions_from_packets( &packets,
&packets, &packet_indexes,
&packet_indexes, &Arc::new(FeatureSet::default()),
&Arc::new(FeatureSet::default()), votes_only,
&RwLock::new(CostTracker::default()).read().unwrap(), );
&BankingStageStats::default(),
false,
votes_only,
&Arc::new(RwLock::new(CostModel::default())),
);
assert_eq!(2, txs.len()); assert_eq!(2, txs.len());
assert_eq!(vec![0, 2], tx_packet_index); assert_eq!(vec![0, 2], tx_packet_index);
} }
@ -3208,32 +3090,22 @@ mod tests {
); );
let mut votes_only = false; let mut votes_only = false;
let (txs, tx_packet_index, _retryable_packet_indexes) = let (txs, tx_packet_index) = BankingStage::transactions_from_packets(
BankingStage::transactions_from_packets( &packets,
&packets, &packet_indexes,
&packet_indexes, &Arc::new(FeatureSet::default()),
&Arc::new(FeatureSet::default()), votes_only,
&RwLock::new(CostTracker::default()).read().unwrap(), );
&BankingStageStats::default(),
false,
votes_only,
&Arc::new(RwLock::new(CostModel::default())),
);
assert_eq!(3, txs.len()); assert_eq!(3, txs.len());
assert_eq!(vec![0, 1, 2], tx_packet_index); assert_eq!(vec![0, 1, 2], tx_packet_index);
votes_only = true; votes_only = true;
let (txs, tx_packet_index, _retryable_packet_indexes) = let (txs, tx_packet_index) = BankingStage::transactions_from_packets(
BankingStage::transactions_from_packets( &packets,
&packets, &packet_indexes,
&packet_indexes, &Arc::new(FeatureSet::default()),
&Arc::new(FeatureSet::default()), votes_only,
&RwLock::new(CostTracker::default()).read().unwrap(), );
&BankingStageStats::default(),
false,
votes_only,
&Arc::new(RwLock::new(CostModel::default())),
);
assert_eq!(3, txs.len()); assert_eq!(3, txs.len());
assert_eq!(vec![0, 1, 2], tx_packet_index); assert_eq!(vec![0, 1, 2], tx_packet_index);
} }

View File

@ -32,6 +32,7 @@ pub mod optimistic_confirmation_verifier;
pub mod outstanding_requests; pub mod outstanding_requests;
pub mod packet_hasher; pub mod packet_hasher;
pub mod progress_map; pub mod progress_map;
pub mod qos_service;
pub mod repair_response; pub mod repair_response;
pub mod repair_service; pub mod repair_service;
pub mod repair_weight; pub mod repair_weight;

242
core/src/qos_service.rs Normal file
View File

@ -0,0 +1,242 @@
//! Quality of service for block producer.
//! Provides logic and functions to allow a Leader to prioritize
//! how transactions are included in blocks, and optimize those blocks.
//!
use {
solana_measure::measure::Measure,
solana_runtime::{
bank::Bank,
cost_model::{CostModel, TransactionCost},
cost_tracker::CostTrackerError,
},
solana_sdk::transaction::{self, SanitizedTransaction, TransactionError},
std::sync::{Arc, RwLock},
};
#[derive(Default)]
pub struct QosServiceStats {
compute_cost_time: u64,
cost_tracking_time: u64,
selected_txs_count: u64,
retried_txs_per_block_limit_count: u64,
retried_txs_per_account_limit_count: u64,
}
impl QosServiceStats {
pub fn report(&mut self) {
datapoint_info!(
"qos-service-stats",
("compute_cost_time", self.compute_cost_time, i64),
("cost_tracking_time", self.cost_tracking_time, i64),
("selected_txs_count", self.selected_txs_count, i64),
(
"retried_txs_per_block_limit_count",
self.retried_txs_per_block_limit_count,
i64
),
(
"retried_txs_per_account_limit_count",
self.retried_txs_per_account_limit_count,
i64
),
);
}
}
pub struct QosService {
cost_model: Arc<RwLock<CostModel>>,
}
impl QosService {
pub fn new(cost_model: Arc<RwLock<CostModel>>) -> Self {
Self { cost_model }
}
pub fn compute_transaction_costs<'a>(
&self,
transactions: impl Iterator<Item = &'a SanitizedTransaction>,
demote_program_write_locks: bool,
stats: &mut QosServiceStats,
) -> Vec<TransactionCost> {
let mut compute_cost_time = Measure::start("compute_cost_time");
let cost_model = self.cost_model.read().unwrap();
let txs_costs = transactions
.map(|tx| {
let cost = cost_model.calculate_cost(tx, demote_program_write_locks);
debug!(
"transaction {:?}, cost {:?}, cost sum {}",
tx,
cost,
cost.sum()
);
cost
})
.collect();
compute_cost_time.stop();
stats.compute_cost_time += compute_cost_time.as_us();
txs_costs
}
// Given a list of transactions and their costs, this function returns a corresponding
// list of Results that indicate if a transaction is selected to be included in the current block,
pub fn select_transactions_per_cost<'a>(
&self,
transactions: impl Iterator<Item = &'a SanitizedTransaction>,
transactions_costs: impl Iterator<Item = &'a TransactionCost>,
bank: &Arc<Bank>,
stats: &mut QosServiceStats,
) -> Vec<transaction::Result<()>> {
let mut cost_tracking_time = Measure::start("cost_tracking_time");
let mut cost_tracker = bank.write_cost_tracker().unwrap();
let select_results = transactions
.zip(transactions_costs)
.map(|(tx, cost)| match cost_tracker.try_add(tx, cost) {
Ok(current_block_cost) => {
debug!("slot {:?}, transaction {:?}, cost {:?}, fit into current block, current block cost {}", bank.slot(), tx, cost, current_block_cost);
stats.selected_txs_count += 1;
Ok(())
},
Err(e) => {
debug!("slot {:?}, transaction {:?}, cost {:?}, not fit into current block, '{:?}'", bank.slot(), tx, cost, e);
match e {
CostTrackerError::WouldExceedBlockMaxLimit => {
stats.retried_txs_per_block_limit_count += 1;
Err(TransactionError::WouldExceedMaxBlockCostLimit)
}
CostTrackerError::WouldExceedAccountMaxLimit => {
stats.retried_txs_per_account_limit_count += 1;
Err(TransactionError::WouldExceedMaxAccountCostLimit)
}
}
}
})
.collect();
cost_tracking_time.stop();
stats.cost_tracking_time += cost_tracking_time.as_us();
select_results
}
}
#[cfg(test)]
mod tests {
use {
super::*,
itertools::Itertools,
solana_runtime::{
bank::Bank,
genesis_utils::{create_genesis_config, GenesisConfigInfo},
},
solana_sdk::{
hash::Hash,
signature::{Keypair, Signer},
system_transaction,
},
solana_vote_program::vote_transaction,
};
#[test]
fn test_compute_transactions_costs() {
solana_logger::setup();
// make a vec of txs
let keypair = Keypair::new();
let transfer_tx = SanitizedTransaction::from_transaction_for_tests(
system_transaction::transfer(&keypair, &keypair.pubkey(), 1, Hash::default()),
);
let vote_tx = SanitizedTransaction::from_transaction_for_tests(
vote_transaction::new_vote_transaction(
vec![42],
Hash::default(),
Hash::default(),
&keypair,
&keypair,
&keypair,
None,
),
);
let txs = vec![transfer_tx.clone(), vote_tx.clone(), vote_tx, transfer_tx];
let cost_model = Arc::new(RwLock::new(CostModel::default()));
let qos_service = QosService::new(cost_model.clone());
let txs_costs = qos_service.compute_transaction_costs(
txs.iter(),
false,
&mut QosServiceStats::default(),
);
// verify the size of txs_costs and its contents
assert_eq!(txs_costs.len(), txs.len());
txs_costs
.iter()
.enumerate()
.map(|(index, cost)| {
assert_eq!(
cost.sum(),
cost_model
.read()
.unwrap()
.calculate_cost(&txs[index], false)
.sum()
);
})
.collect_vec();
}
#[test]
fn test_select_transactions_per_cost() {
solana_logger::setup();
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10);
let bank = Arc::new(Bank::new_for_tests(&genesis_config));
let cost_model = Arc::new(RwLock::new(CostModel::default()));
let keypair = Keypair::new();
let transfer_tx = SanitizedTransaction::from_transaction_for_tests(
system_transaction::transfer(&keypair, &keypair.pubkey(), 1, Hash::default()),
);
let vote_tx = SanitizedTransaction::from_transaction_for_tests(
vote_transaction::new_vote_transaction(
vec![42],
Hash::default(),
Hash::default(),
&keypair,
&keypair,
&keypair,
None,
),
);
let transfer_tx_cost = cost_model
.read()
.unwrap()
.calculate_cost(&transfer_tx, false)
.sum();
// make a vec of txs
let txs = vec![transfer_tx.clone(), vote_tx.clone(), transfer_tx, vote_tx];
let qos_service = QosService::new(cost_model);
let txs_costs = qos_service.compute_transaction_costs(
txs.iter(),
false,
&mut QosServiceStats::default(),
);
// set cost tracker limit to fit 1 transfer tx, vote tx bypasses limit check
let cost_limit = transfer_tx_cost;
bank.write_cost_tracker()
.unwrap()
.set_limits(cost_limit, cost_limit);
let results = qos_service.select_transactions_per_cost(
txs.iter(),
txs_costs.iter(),
&bank,
&mut QosServiceStats::default(),
);
// verify that first transfer tx and all votes are allowed
assert_eq!(results.len(), txs.len());
assert!(results[0].is_ok());
assert!(results[1].is_ok());
assert!(results[2].is_err());
assert!(results[3].is_ok());
}
}

View File

@ -932,6 +932,26 @@ impl Accounts {
.collect() .collect()
} }
#[allow(clippy::needless_collect)]
pub fn lock_accounts_with_results<'a>(
&self,
txs: impl Iterator<Item = &'a SanitizedTransaction>,
results: impl Iterator<Item = &'a Result<()>>,
demote_program_write_locks: bool,
) -> Vec<Result<()>> {
let keys: Vec<_> = txs
.map(|tx| tx.get_account_locks(demote_program_write_locks))
.collect();
let account_locks = &mut self.account_locks.lock().unwrap();
keys.into_iter()
.zip(results)
.map(|(keys, result)| match result {
Ok(()) => self.lock_account(account_locks, keys.writable, keys.readonly),
Err(e) => Err(e.clone()),
})
.collect()
}
/// Once accounts are unlocked, new transactions that modify that state can enter the pipeline /// Once accounts are unlocked, new transactions that modify that state can enter the pipeline
#[allow(clippy::needless_collect)] #[allow(clippy::needless_collect)]
pub fn unlock_accounts<'a>( pub fn unlock_accounts<'a>(

View File

@ -230,7 +230,7 @@ impl ExecuteTimings {
} }
type BankStatusCache = StatusCache<Result<()>>; type BankStatusCache = StatusCache<Result<()>>;
#[frozen_abi(digest = "5Br3PNyyX1L7XoS4jYLt5JTeMXowLSsu7v9LhokC8vnq")] #[frozen_abi(digest = "7bCDimGo11ajw6ZHViBBu8KPfoDZBcwSnumWCU8MMuwr")]
pub type BankSlotDelta = SlotDelta<Result<()>>; pub type BankSlotDelta = SlotDelta<Result<()>>;
type TransactionAccountRefCells = Vec<(Pubkey, Rc<RefCell<AccountSharedData>>)>; type TransactionAccountRefCells = Vec<(Pubkey, Rc<RefCell<AccountSharedData>>)>;
@ -3357,6 +3357,22 @@ impl Bank {
TransactionBatch::new(lock_results, self, Cow::Borrowed(txs)) TransactionBatch::new(lock_results, self, Cow::Borrowed(txs))
} }
/// Prepare a locked transaction batch from a list of sanitized transactions, and their cost
/// limited packing status
pub fn prepare_sanitized_batch_with_results<'a, 'b>(
&'a self,
transactions: &'b [SanitizedTransaction],
transaction_results: impl Iterator<Item = &'b Result<()>>,
) -> TransactionBatch<'a, 'b> {
// this lock_results could be: Ok, AccountInUse, WouldExceedBlockMaxLimit or WouldExceedAccountMaxLimit
let lock_results = self.rc.accounts.lock_accounts_with_results(
transactions.iter(),
transaction_results,
self.demote_program_write_locks(),
);
TransactionBatch::new(lock_results, self, Cow::Borrowed(transactions))
}
/// Prepare a transaction batch without locking accounts for transaction simulation. /// Prepare a transaction batch without locking accounts for transaction simulation.
pub(crate) fn prepare_simulation_batch<'a>( pub(crate) fn prepare_simulation_batch<'a>(
&'a self, &'a self,
@ -3772,6 +3788,8 @@ impl Bank {
error_counters.account_in_use += 1; error_counters.account_in_use += 1;
Some(index) Some(index)
} }
Err(TransactionError::WouldExceedMaxBlockCostLimit)
| Err(TransactionError::WouldExceedMaxAccountCostLimit) => Some(index),
Err(_) => None, Err(_) => None,
Ok(_) => None, Ok(_) => None,
}) })
@ -6413,7 +6431,7 @@ pub fn goto_end_of_slot(bank: &mut Bank) {
} }
} }
fn is_simple_vote_transaction(transaction: &SanitizedTransaction) -> bool { pub fn is_simple_vote_transaction(transaction: &SanitizedTransaction) -> bool {
if transaction.message().instructions().len() == 1 { if transaction.message().instructions().len() == 1 {
let (program_pubkey, instruction) = transaction let (program_pubkey, instruction) = transaction
.message() .message()

View File

@ -4,7 +4,9 @@
//! //!
//! The main function is `calculate_cost` which returns &TransactionCost. //! The main function is `calculate_cost` which returns &TransactionCost.
//! //!
use crate::{block_cost_limits::*, execute_cost_table::ExecuteCostTable}; use crate::{
bank::is_simple_vote_transaction, block_cost_limits::*, execute_cost_table::ExecuteCostTable,
};
use log::*; use log::*;
use solana_sdk::{pubkey::Pubkey, transaction::SanitizedTransaction}; use solana_sdk::{pubkey::Pubkey, transaction::SanitizedTransaction};
use std::collections::HashMap; use std::collections::HashMap;
@ -12,13 +14,31 @@ use std::collections::HashMap;
const MAX_WRITABLE_ACCOUNTS: usize = 256; const MAX_WRITABLE_ACCOUNTS: usize = 256;
// costs are stored in number of 'compute unit's // costs are stored in number of 'compute unit's
#[derive(AbiExample, Default, Debug)] #[derive(AbiExample, Debug)]
pub struct TransactionCost { pub struct TransactionCost {
pub writable_accounts: Vec<Pubkey>, pub writable_accounts: Vec<Pubkey>,
pub signature_cost: u64, pub signature_cost: u64,
pub write_lock_cost: u64, pub write_lock_cost: u64,
pub data_bytes_cost: u64, pub data_bytes_cost: u64,
pub execution_cost: u64, pub execution_cost: u64,
// `cost_weight` is a multiplier to be applied to tx cost, that
// allows to increase/decrease tx cost linearly based on algo.
// for example, vote tx could have weight zero to bypass cost
// limit checking during block packing.
pub cost_weight: u32,
}
impl Default for TransactionCost {
fn default() -> Self {
Self {
writable_accounts: Vec::with_capacity(MAX_WRITABLE_ACCOUNTS),
signature_cost: 0u64,
write_lock_cost: 0u64,
data_bytes_cost: 0u64,
execution_cost: 0u64,
cost_weight: 1u32,
}
}
} }
impl TransactionCost { impl TransactionCost {
@ -35,6 +55,7 @@ impl TransactionCost {
self.write_lock_cost = 0; self.write_lock_cost = 0;
self.data_bytes_cost = 0; self.data_bytes_cost = 0;
self.execution_cost = 0; self.execution_cost = 0;
self.cost_weight = 1;
} }
pub fn sum(&self) -> u64 { pub fn sum(&self) -> u64 {
@ -95,6 +116,7 @@ impl CostModel {
self.get_write_lock_cost(&mut tx_cost, transaction, demote_program_write_locks); self.get_write_lock_cost(&mut tx_cost, transaction, demote_program_write_locks);
tx_cost.data_bytes_cost = self.get_data_bytes_cost(transaction); tx_cost.data_bytes_cost = self.get_data_bytes_cost(transaction);
tx_cost.execution_cost = self.get_transaction_cost(transaction); tx_cost.execution_cost = self.get_transaction_cost(transaction);
tx_cost.cost_weight = self.calculate_cost_weight(transaction);
debug!("transaction {:?} has cost {:?}", transaction, tx_cost); debug!("transaction {:?} has cost {:?}", transaction, tx_cost);
tx_cost tx_cost
@ -177,6 +199,15 @@ impl CostModel {
} }
} }
} }
fn calculate_cost_weight(&self, transaction: &SanitizedTransaction) -> u32 {
if is_simple_vote_transaction(transaction) {
// vote has zero cost weight, so it bypasses block cost limit checking
0u32
} else {
1u32
}
}
} }
#[cfg(test)] #[cfg(test)]
@ -196,6 +227,7 @@ mod tests {
system_program, system_transaction, system_program, system_transaction,
transaction::Transaction, transaction::Transaction,
}; };
use solana_vote_program::vote_transaction;
use std::{ use std::{
str::FromStr, str::FromStr,
sync::{Arc, RwLock}, sync::{Arc, RwLock},
@ -394,6 +426,7 @@ mod tests {
assert_eq!(expected_account_cost, tx_cost.write_lock_cost); assert_eq!(expected_account_cost, tx_cost.write_lock_cost);
assert_eq!(expected_execution_cost, tx_cost.execution_cost); assert_eq!(expected_execution_cost, tx_cost.execution_cost);
assert_eq!(2, tx_cost.writable_accounts.len()); assert_eq!(2, tx_cost.writable_accounts.len());
assert_eq!(1u32, tx_cost.cost_weight);
} }
#[test] #[test]
@ -500,4 +533,31 @@ mod tests {
.get_cost(&solana_vote_program::id()) .get_cost(&solana_vote_program::id())
.is_some()); .is_some());
} }
#[test]
fn test_calculate_cost_weight() {
let (mint_keypair, start_hash) = test_setup();
let keypair = Keypair::new();
let simple_transaction = SanitizedTransaction::from_transaction_for_tests(
system_transaction::transfer(&mint_keypair, &keypair.pubkey(), 2, start_hash),
);
let vote_transaction = SanitizedTransaction::from_transaction_for_tests(
vote_transaction::new_vote_transaction(
vec![42],
Hash::default(),
Hash::default(),
&keypair,
&keypair,
&keypair,
None,
),
);
let testee = CostModel::default();
// For now, vote has zero weight, everything else is neutral, for now
assert_eq!(1u32, testee.calculate_cost_weight(&simple_transaction));
assert_eq!(0u32, testee.calculate_cost_weight(&vote_transaction));
}
} }

View File

@ -72,7 +72,7 @@ impl CostTracker {
_transaction: &SanitizedTransaction, _transaction: &SanitizedTransaction,
tx_cost: &TransactionCost, tx_cost: &TransactionCost,
) -> Result<u64, CostTrackerError> { ) -> Result<u64, CostTrackerError> {
let cost = tx_cost.sum(); let cost = tx_cost.sum() * tx_cost.cost_weight as u64;
self.would_fit(&tx_cost.writable_accounts, &cost)?; self.would_fit(&tx_cost.writable_accounts, &cost)?;
self.add_transaction(&tx_cost.writable_accounts, &cost); self.add_transaction(&tx_cost.writable_accounts, &cost);
Ok(self.block_cost) Ok(self.block_cost)
@ -369,4 +369,26 @@ mod tests {
assert_eq!(acct2, costliest_account); assert_eq!(acct2, costliest_account);
} }
} }
#[test]
fn test_try_add_with_cost_weight() {
let (mint_keypair, start_hash) = test_setup();
let (tx, _keys, _cost) = build_simple_transaction(&mint_keypair, &start_hash);
let tx = SanitizedTransaction::from_transaction_for_tests(tx);
let limit = 100u64;
let mut testee = CostTracker::new(limit, limit);
let mut cost = TransactionCost {
execution_cost: limit + 1,
..TransactionCost::default()
};
// cost exceed limit by 1, will not fit
assert!(testee.try_add(&tx, &cost).is_err());
cost.cost_weight = 0u32;
// setting cost_weight to zero will allow this tx
assert!(testee.try_add(&tx, &cost).is_ok());
}
} }

View File

@ -110,9 +110,8 @@ pub enum TransactionError {
#[error("Transaction processing left an account with an outstanding borrowed reference")] #[error("Transaction processing left an account with an outstanding borrowed reference")]
AccountBorrowOutstanding, AccountBorrowOutstanding,
#[error( /// Transaction would exceed max Block Cost Limit
"Transaction could not fit into current block without exceeding the Max Block Cost Limit" #[error("Transaction would exceed max Block Cost Limit")]
)]
WouldExceedMaxBlockCostLimit, WouldExceedMaxBlockCostLimit,
/// Transaction version is unsupported /// Transaction version is unsupported
@ -122,6 +121,10 @@ pub enum TransactionError {
/// Transaction loads a writable account that cannot be written /// Transaction loads a writable account that cannot be written
#[error("Transaction loads a writable account that cannot be written")] #[error("Transaction loads a writable account that cannot be written")]
InvalidWritableAccount, InvalidWritableAccount,
/// Transaction would exceed max account limit within the block
#[error("Transaction would exceed max account limit within the block")]
WouldExceedMaxAccountCostLimit,
} }
pub type Result<T> = result::Result<T, TransactionError>; pub type Result<T> = result::Result<T, TransactionError>;

View File

@ -44,6 +44,7 @@ enum TransactionErrorType {
WOULD_EXCEED_MAX_BLOCK_COST_LIMIT = 17; WOULD_EXCEED_MAX_BLOCK_COST_LIMIT = 17;
UNSUPPORTED_VERSION = 18; UNSUPPORTED_VERSION = 18;
INVALID_WRITABLE_ACCOUNT = 19; INVALID_WRITABLE_ACCOUNT = 19;
WOULD_EXCEED_MAX_ACCOUNT_COST_LIMIT = 20;
} }
message InstructionError { message InstructionError {

View File

@ -553,6 +553,7 @@ impl TryFrom<tx_by_addr::TransactionError> for TransactionError {
17 => TransactionError::WouldExceedMaxBlockCostLimit, 17 => TransactionError::WouldExceedMaxBlockCostLimit,
18 => TransactionError::UnsupportedVersion, 18 => TransactionError::UnsupportedVersion,
19 => TransactionError::InvalidWritableAccount, 19 => TransactionError::InvalidWritableAccount,
20 => TransactionError::WouldExceedMaxAccountCostLimit,
_ => return Err("Invalid TransactionError"), _ => return Err("Invalid TransactionError"),
}) })
} }
@ -620,6 +621,9 @@ impl From<TransactionError> for tx_by_addr::TransactionError {
TransactionError::InvalidWritableAccount => { TransactionError::InvalidWritableAccount => {
tx_by_addr::TransactionErrorType::InvalidWritableAccount tx_by_addr::TransactionErrorType::InvalidWritableAccount
} }
TransactionError::WouldExceedMaxAccountCostLimit => {
tx_by_addr::TransactionErrorType::WouldExceedMaxAccountCostLimit
}
} as i32, } as i32,
instruction_error: match transaction_error { instruction_error: match transaction_error {
TransactionError::InstructionError(index, ref instruction_error) => { TransactionError::InstructionError(index, ref instruction_error) => {
@ -1031,6 +1035,14 @@ mod test {
tx_by_addr_transaction_error.try_into().unwrap() tx_by_addr_transaction_error.try_into().unwrap()
); );
let transaction_error = TransactionError::WouldExceedMaxAccountCostLimit;
let tx_by_addr_transaction_error: tx_by_addr::TransactionError =
transaction_error.clone().into();
assert_eq!(
transaction_error,
tx_by_addr_transaction_error.try_into().unwrap()
);
let transaction_error = TransactionError::UnsupportedVersion; let transaction_error = TransactionError::UnsupportedVersion;
let tx_by_addr_transaction_error: tx_by_addr::TransactionError = let tx_by_addr_transaction_error: tx_by_addr::TransactionError =
transaction_error.clone().into(); transaction_error.clone().into();