Aggregate cost_model into cost_tracker (#18374)

* * aggregate cost_model into cost_tracker, decouple it from banking_stage to prevent accidental deadlock. * Simplified code, removed unused functions

* review fixes
This commit is contained in:
Tao Zhu 2021-07-06 10:41:25 -05:00 committed by GitHub
parent 660788d227
commit 0e039b4094
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 168 additions and 310 deletions

View File

@ -4,7 +4,7 @@ use crossbeam_channel::unbounded;
use log::*;
use rand::{thread_rng, Rng};
use rayon::prelude::*;
use solana_core::{banking_stage::BankingStage, cost_model::CostModel};
use solana_core::{banking_stage::BankingStage, cost_model::CostModel, cost_tracker::CostTracker};
use solana_gossip::{cluster_info::ClusterInfo, cluster_info::Node};
use solana_ledger::{
blockstore::Blockstore,
@ -224,7 +224,9 @@ fn main() {
vote_receiver,
None,
replay_vote_sender,
&Arc::new(RwLock::new(CostModel::default())),
Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new(
CostModel::default(),
))))),
);
poh_recorder.lock().unwrap().set_bank(&bank);

View File

@ -93,8 +93,9 @@ fn bench_consume_buffered(bencher: &mut Bencher) {
None::<Box<dyn Fn()>>,
&BankingStageStats::default(),
&recorder,
&Arc::new(RwLock::new(CostModel::default())),
&Arc::new(RwLock::new(CostTracker::new(std::u64::MAX, std::u64::MAX))),
&Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new(
CostModel::new(std::u64::MAX, std::u64::MAX),
))))),
);
});
@ -208,14 +209,16 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info);
let cluster_info = Arc::new(cluster_info);
let (s, _r) = unbounded();
let _banking_stage = BankingStage::new_with_cost_limit(
let _banking_stage = BankingStage::new(
&cluster_info,
&poh_recorder,
verified_receiver,
vote_receiver,
None,
s,
&Arc::new(RwLock::new(CostModel::new(std::u64::MAX, std::u64::MAX))),
Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new(
CostModel::new(std::u64::MAX, std::u64::MAX),
))))),
);
poh_recorder.lock().unwrap().set_bank(&bank);

View File

@ -1,10 +1,7 @@
//! The `banking_stage` processes Transaction messages. It is intended to be used
//! to contruct a software pipeline. The stage uses all available CPU cores and
//! can do its processing in parallel with signature verification on the GPU.
use crate::{
cost_model::CostModel, cost_model::TransactionCost, cost_tracker::CostTracker,
packet_hasher::PacketHasher,
};
use crate::{cost_tracker::CostTracker, packet_hasher::PacketHasher};
use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError};
use itertools::Itertools;
use lru::LruCache;
@ -79,8 +76,6 @@ const MAX_NUM_TRANSACTIONS_PER_BATCH: usize = 128;
const DEFAULT_LRU_SIZE: usize = 200_000;
const MAX_WRITABLE_ACCOUNTS: usize = 256;
#[derive(Debug, Default)]
pub struct BankingStageStats {
last_report: AtomicU64,
@ -271,38 +266,8 @@ impl BankingStage {
verified_vote_receiver: CrossbeamReceiver<Vec<Packets>>,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: ReplayVoteSender,
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: Arc<RwLock<CostTracker>>,
) -> Self {
Self::new_with_cost_limit(
cluster_info,
poh_recorder,
verified_receiver,
verified_vote_receiver,
transaction_status_sender,
gossip_vote_sender,
cost_model,
)
}
pub fn new_with_cost_limit(
cluster_info: &Arc<ClusterInfo>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
verified_receiver: CrossbeamReceiver<Vec<Packets>>,
verified_vote_receiver: CrossbeamReceiver<Vec<Packets>>,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: ReplayVoteSender,
cost_model: &Arc<RwLock<CostModel>>,
) -> Self {
// 'cost_tracker' tracks bank's cost against configured limits.
let cost_tracker = {
let cost_model = cost_model.read().unwrap();
CostTracker::new(
cost_model.get_account_cost_limit(),
cost_model.get_block_cost_limit(),
)
};
let cost_tracker = Arc::new(RwLock::new(cost_tracker));
Self::new_num_threads(
cluster_info,
poh_recorder,
@ -311,8 +276,7 @@ impl BankingStage {
Self::num_threads(),
transaction_status_sender,
gossip_vote_sender,
cost_model,
&cost_tracker,
cost_tracker,
)
}
@ -324,8 +288,7 @@ impl BankingStage {
num_threads: u32,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: ReplayVoteSender,
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: &Arc<RwLock<CostTracker>>,
cost_tracker: Arc<RwLock<CostTracker>>,
) -> Self {
let batch_limit = TOTAL_BUFFERED_PACKETS / ((num_threads - 1) as usize * PACKETS_PER_BATCH);
// Single thread to generate entries from many banks.
@ -351,7 +314,6 @@ impl BankingStage {
let transaction_status_sender = transaction_status_sender.clone();
let gossip_vote_sender = gossip_vote_sender.clone();
let duplicates = duplicates.clone();
let cost_model = cost_model.clone();
let cost_tracker = cost_tracker.clone();
Builder::new()
.name("solana-banking-stage-tx".to_string())
@ -367,7 +329,6 @@ impl BankingStage {
transaction_status_sender,
gossip_vote_sender,
&duplicates,
&cost_model,
&cost_tracker,
);
})
@ -438,7 +399,6 @@ impl BankingStage {
test_fn: Option<impl Fn()>,
banking_stage_stats: &BankingStageStats,
recorder: &TransactionRecorder,
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: &Arc<RwLock<CostTracker>>,
) {
let mut rebuffered_packets_len = 0;
@ -457,7 +417,6 @@ impl BankingStage {
original_unprocessed_indexes,
my_pubkey,
*next_leader,
cost_model,
cost_tracker,
banking_stage_stats,
);
@ -483,7 +442,6 @@ impl BankingStage {
transaction_status_sender.clone(),
gossip_vote_sender,
banking_stage_stats,
cost_model,
cost_tracker,
);
if processed < verified_txs_len
@ -587,7 +545,6 @@ impl BankingStage {
gossip_vote_sender: &ReplayVoteSender,
banking_stage_stats: &BankingStageStats,
recorder: &TransactionRecorder,
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: &Arc<RwLock<CostTracker>>,
) -> BufferedPacketsDecision {
let bank_start;
@ -636,7 +593,6 @@ impl BankingStage {
None::<Box<dyn Fn()>>,
banking_stage_stats,
recorder,
cost_model,
cost_tracker,
);
}
@ -707,7 +663,6 @@ impl BankingStage {
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: ReplayVoteSender,
duplicates: &Arc<Mutex<(LruCache<u64, ()>, PacketHasher)>>,
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: &Arc<RwLock<CostTracker>>,
) {
let recorder = poh_recorder.lock().unwrap().recorder();
@ -728,7 +683,6 @@ impl BankingStage {
&gossip_vote_sender,
&banking_stage_stats,
&recorder,
cost_model,
cost_tracker,
);
if matches!(decision, BufferedPacketsDecision::Hold)
@ -764,7 +718,6 @@ impl BankingStage {
&banking_stage_stats,
duplicates,
&recorder,
cost_model,
cost_tracker,
) {
Ok(()) | Err(RecvTimeoutError::Timeout) => (),
@ -1106,7 +1059,6 @@ impl BankingStage {
msgs: &Packets,
transaction_indexes: &[usize],
secp256k1_program_enabled: bool,
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: &Arc<RwLock<CostTracker>>,
banking_stage_stats: &BankingStageStats,
) -> (Vec<HashedTransaction<'static>>, Vec<usize>, Vec<usize>) {
@ -1129,18 +1081,12 @@ impl BankingStage {
);
let mut cost_tracker_check_time = Measure::start("cost_tracker_check_time");
let mut tx_cost = TransactionCost::new_with_capacity(MAX_WRITABLE_ACCOUNTS);
let filtered_transactions_with_packet_indexes: Vec<_> = {
let cost_model_readonly = cost_model.read().unwrap();
let cost_tracker_readonly = cost_tracker.read().unwrap();
verified_transactions_with_packet_indexes
.into_iter()
.filter_map(|(tx, tx_index)| {
cost_model_readonly.calculate_cost_no_alloc(&tx, &mut tx_cost);
let result = cost_tracker_readonly.would_fit(
&tx_cost.writable_accounts,
&(tx_cost.account_access_cost + tx_cost.execution_cost),
);
let result = cost_tracker_readonly.would_transaction_fit(&tx);
if result.is_err() {
debug!("transaction {:?} would exceed limit: {:?}", tx, result);
retryable_transaction_packet_indexes.push(tx_index);
@ -1226,7 +1172,6 @@ impl BankingStage {
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender,
banking_stage_stats: &BankingStageStats,
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: &Arc<RwLock<CostTracker>>,
) -> (usize, usize, Vec<usize>) {
let mut packet_conversion_time = Measure::start("packet_conversion");
@ -1235,7 +1180,6 @@ impl BankingStage {
msgs,
&packet_indexes,
bank.secp256k1_program_enabled(),
cost_model,
cost_tracker,
banking_stage_stats,
);
@ -1272,23 +1216,14 @@ impl BankingStage {
// applying cost of processed transactions to shared cost_tracker
let mut cost_tracking_time = Measure::start("cost_tracking_time");
let mut tx_cost = TransactionCost::new_with_capacity(MAX_WRITABLE_ACCOUNTS);
{
//let cost_model_readonly = cost_model.read().unwrap();
//let mut cost_tracker_mutable = cost_tracker.write().unwrap();
transactions.iter().enumerate().for_each(|(index, tx)| {
if !unprocessed_tx_indexes.iter().any(|&i| i == index) {
cost_model
.read()
.unwrap()
.calculate_cost_no_alloc(tx.transaction(), &mut tx_cost);
cost_tracker.write().unwrap().add_transaction(
&tx_cost.writable_accounts,
&(tx_cost.account_access_cost + tx_cost.execution_cost),
);
}
});
}
transactions.iter().enumerate().for_each(|(index, tx)| {
if unprocessed_tx_indexes.iter().all(|&i| i != index) {
cost_tracker
.write()
.unwrap()
.add_transaction_cost(tx.transaction());
}
});
cost_tracking_time.stop();
let mut filter_pending_packets_time = Measure::start("filter_pending_packets_time");
@ -1331,7 +1266,6 @@ impl BankingStage {
transaction_indexes: &[usize],
my_pubkey: &Pubkey,
next_leader: Option<Pubkey>,
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: &Arc<RwLock<CostTracker>>,
banking_stage_stats: &BankingStageStats,
) -> Vec<usize> {
@ -1351,7 +1285,6 @@ impl BankingStage {
msgs,
transaction_indexes,
bank.secp256k1_program_enabled(),
cost_model,
cost_tracker,
banking_stage_stats,
);
@ -1414,7 +1347,6 @@ impl BankingStage {
banking_stage_stats: &BankingStageStats,
duplicates: &Arc<Mutex<(LruCache<u64, ()>, PacketHasher)>>,
recorder: &TransactionRecorder,
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: &Arc<RwLock<CostTracker>>,
) -> Result<(), RecvTimeoutError> {
let mut recv_time = Measure::start("process_packets_recv");
@ -1466,7 +1398,6 @@ impl BankingStage {
transaction_status_sender.clone(),
gossip_vote_sender,
banking_stage_stats,
cost_model,
cost_tracker,
);
@ -1498,7 +1429,6 @@ impl BankingStage {
&packet_indexes,
my_pubkey,
next_leader,
cost_model,
cost_tracker,
banking_stage_stats,
);
@ -1637,7 +1567,7 @@ fn next_leader_tpu_forwards(
#[cfg(test)]
mod tests {
use super::*;
use crate::cost_model::{ACCOUNT_MAX_COST, BLOCK_MAX_COST};
use crate::cost_model::CostModel;
use crossbeam_channel::unbounded;
use itertools::Itertools;
use solana_gossip::cluster_info::Node;
@ -1698,7 +1628,9 @@ mod tests {
vote_receiver,
None,
gossip_vote_sender,
&Arc::new(RwLock::new(CostModel::default())),
Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new(
CostModel::default(),
))))),
);
drop(verified_sender);
drop(vote_sender);
@ -1744,7 +1676,9 @@ mod tests {
vote_receiver,
None,
gossip_vote_sender,
&Arc::new(RwLock::new(CostModel::default())),
Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new(
CostModel::default(),
))))),
);
trace!("sending bank");
drop(verified_sender);
@ -1814,7 +1748,9 @@ mod tests {
vote_receiver,
None,
gossip_vote_sender,
&Arc::new(RwLock::new(CostModel::default())),
Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new(
CostModel::default(),
))))),
);
// fund another account so we can send 2 good transactions in a single batch.
@ -1963,11 +1899,9 @@ mod tests {
2,
None,
gossip_vote_sender,
&Arc::new(RwLock::new(CostModel::default())),
&Arc::new(RwLock::new(CostTracker::new(
ACCOUNT_MAX_COST,
BLOCK_MAX_COST,
))),
Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new(
CostModel::default(),
))))),
);
// wait for banking_stage to eat the packets
@ -2788,11 +2722,9 @@ mod tests {
None::<Box<dyn Fn()>>,
&BankingStageStats::default(),
&recorder,
&Arc::new(RwLock::new(CostModel::default())),
&Arc::new(RwLock::new(CostTracker::new(
ACCOUNT_MAX_COST,
BLOCK_MAX_COST,
))),
&Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new(
CostModel::default(),
))))),
);
assert_eq!(buffered_packets[0].1.len(), num_conflicting_transactions);
// When the poh recorder has a bank, should process all non conflicting buffered packets.
@ -2809,11 +2741,9 @@ mod tests {
None::<Box<dyn Fn()>>,
&BankingStageStats::default(),
&recorder,
&Arc::new(RwLock::new(CostModel::default())),
&Arc::new(RwLock::new(CostTracker::new(
ACCOUNT_MAX_COST,
BLOCK_MAX_COST,
))),
&Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new(
CostModel::default(),
))))),
);
if num_expected_unprocessed == 0 {
assert!(buffered_packets.is_empty())
@ -2879,11 +2809,9 @@ mod tests {
test_fn,
&BankingStageStats::default(),
&recorder,
&Arc::new(RwLock::new(CostModel::default())),
&Arc::new(RwLock::new(CostTracker::new(
ACCOUNT_MAX_COST,
BLOCK_MAX_COST,
))),
&Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new(
CostModel::default(),
))))),
);
// Check everything is correct. All indexes after `interrupted_iteration`

View File

@ -5,11 +5,11 @@
//! Instructions take time to execute, both historical and runtime data are
//! used to determine each instruction's execution time, the sum of that
//! is transaction's "execution cost"
//! The main function is `calculate_cost` which returns a TransactionCost struct.
//! The main function is `calculate_cost` which returns &TransactionCost.
//!
use crate::execute_cost_table::ExecuteCostTable;
use log::*;
use solana_sdk::{message::Message, pubkey::Pubkey, transaction::Transaction};
use solana_sdk::{pubkey::Pubkey, transaction::Transaction};
use std::collections::HashMap;
// Guestimated from mainnet-beta data, sigver averages 1us, average read 7us and average write 25us
@ -26,6 +26,7 @@ const SIGNED_WRITABLE_ACCOUNT_ACCESS_COST: u64 =
pub const ACCOUNT_MAX_COST: u64 = 100_000_000;
pub const BLOCK_MAX_COST: u64 = 2_500_000_000;
const MAX_WRITABLE_ACCOUNTS: usize = 256;
const DEMOTE_SYSVAR_WRITE_LOCKS: bool = true;
// cost of transaction is made of account_access_cost and instruction execution_cost
@ -61,6 +62,9 @@ pub struct CostModel {
account_cost_limit: u64,
block_cost_limit: u64,
instruction_execution_cost_table: ExecuteCostTable,
// reusable variables
transaction_cost: TransactionCost,
}
impl Default for CostModel {
@ -75,6 +79,7 @@ impl CostModel {
account_cost_limit: chain_max,
block_cost_limit: block_max,
instruction_execution_cost_table: ExecuteCostTable::default(),
transaction_cost: TransactionCost::new_with_capacity(MAX_WRITABLE_ACCOUNTS),
}
}
@ -86,36 +91,8 @@ impl CostModel {
self.block_cost_limit
}
pub fn calculate_cost(&self, transaction: &Transaction) -> TransactionCost {
let (
signed_writable_accounts,
signed_readonly_accounts,
non_signed_writable_accounts,
non_signed_readonly_accounts,
) = CostModel::sort_accounts_by_type(transaction.message());
let mut cost = TransactionCost {
writable_accounts: vec![],
account_access_cost: CostModel::find_account_access_cost(
&signed_writable_accounts,
&signed_readonly_accounts,
&non_signed_writable_accounts,
&non_signed_readonly_accounts,
),
execution_cost: self.find_transaction_cost(transaction),
};
cost.writable_accounts.extend(&signed_writable_accounts);
cost.writable_accounts.extend(&non_signed_writable_accounts);
debug!("transaction {:?} has cost {:?}", transaction, cost);
cost
}
// calculate `transaction` cost, the result is passed back to caller via mutable
// parameter `cost`. Existing content in `cost` will be erased before adding new content
// This is to allow this function to reuse pre-allocated memory, as this function
// is often on hot-path.
pub fn calculate_cost_no_alloc(&self, transaction: &Transaction, cost: &mut TransactionCost) {
cost.reset();
pub fn calculate_cost(&mut self, transaction: &Transaction) -> &TransactionCost {
self.transaction_cost.reset();
let message = transaction.message();
message.account_keys.iter().enumerate().for_each(|(i, k)| {
@ -123,19 +100,25 @@ impl CostModel {
let is_writable = message.is_writable(i, DEMOTE_SYSVAR_WRITE_LOCKS);
if is_signer && is_writable {
cost.writable_accounts.push(*k);
cost.account_access_cost += SIGNED_WRITABLE_ACCOUNT_ACCESS_COST;
self.transaction_cost.writable_accounts.push(*k);
self.transaction_cost.account_access_cost += SIGNED_WRITABLE_ACCOUNT_ACCESS_COST;
} else if is_signer && !is_writable {
cost.account_access_cost += SIGNED_READONLY_ACCOUNT_ACCESS_COST;
self.transaction_cost.account_access_cost += SIGNED_READONLY_ACCOUNT_ACCESS_COST;
} else if !is_signer && is_writable {
cost.writable_accounts.push(*k);
cost.account_access_cost += NON_SIGNED_WRITABLE_ACCOUNT_ACCESS_COST;
self.transaction_cost.writable_accounts.push(*k);
self.transaction_cost.account_access_cost +=
NON_SIGNED_WRITABLE_ACCOUNT_ACCESS_COST;
} else {
cost.account_access_cost += NON_SIGNED_READONLY_ACCOUNT_ACCESS_COST;
self.transaction_cost.account_access_cost +=
NON_SIGNED_READONLY_ACCOUNT_ACCESS_COST;
}
});
cost.execution_cost = self.find_transaction_cost(transaction);
debug!("transaction {:?} has cost {:?}", transaction, cost);
self.transaction_cost.execution_cost = self.find_transaction_cost(transaction);
debug!(
"transaction {:?} has cost {:?}",
transaction, self.transaction_cost
);
&self.transaction_cost
}
// To update or insert instruction cost to table.
@ -186,50 +169,6 @@ impl CostModel {
}
cost
}
fn find_account_access_cost(
signed_writable_accounts: &[Pubkey],
signed_readonly_accounts: &[Pubkey],
non_signed_writable_accounts: &[Pubkey],
non_signed_readonly_accounts: &[Pubkey],
) -> u64 {
let mut cost = 0;
cost += signed_writable_accounts.len() as u64 * SIGNED_WRITABLE_ACCOUNT_ACCESS_COST;
cost += signed_readonly_accounts.len() as u64 * SIGNED_READONLY_ACCOUNT_ACCESS_COST;
cost += non_signed_writable_accounts.len() as u64 * NON_SIGNED_WRITABLE_ACCOUNT_ACCESS_COST;
cost += non_signed_readonly_accounts.len() as u64 * NON_SIGNED_READONLY_ACCOUNT_ACCESS_COST;
cost
}
fn sort_accounts_by_type(
message: &Message,
) -> (Vec<Pubkey>, Vec<Pubkey>, Vec<Pubkey>, Vec<Pubkey>) {
let demote_sysvar_write_locks = true;
let mut signer_writable: Vec<Pubkey> = vec![];
let mut signer_readonly: Vec<Pubkey> = vec![];
let mut non_signer_writable: Vec<Pubkey> = vec![];
let mut non_signer_readonly: Vec<Pubkey> = vec![];
message.account_keys.iter().enumerate().for_each(|(i, k)| {
let is_signer = message.is_signer(i);
let is_writable = message.is_writable(i, demote_sysvar_write_locks);
if is_signer && is_writable {
signer_writable.push(*k);
} else if is_signer && !is_writable {
signer_readonly.push(*k);
} else if !is_signer && is_writable {
non_signer_writable.push(*k);
} else {
non_signer_readonly.push(*k);
}
});
(
signer_writable,
signer_readonly,
non_signer_writable,
non_signer_readonly,
)
}
}
#[cfg(test)]
@ -387,25 +326,14 @@ mod tests {
vec![prog1, prog2],
instructions,
);
debug!("many random transaction {:?}", tx);
let (
signed_writable_accounts,
signed_readonly_accounts,
non_signed_writable_accounts,
non_signed_readonly_accounts,
) = CostModel::sort_accounts_by_type(tx.message());
assert_eq!(2, signed_writable_accounts.len());
assert_eq!(signer1.pubkey(), signed_writable_accounts[0]);
assert_eq!(signer2.pubkey(), signed_writable_accounts[1]);
assert_eq!(0, signed_readonly_accounts.len());
assert_eq!(2, non_signed_writable_accounts.len());
assert_eq!(key1, non_signed_writable_accounts[0]);
assert_eq!(key2, non_signed_writable_accounts[1]);
assert_eq!(2, non_signed_readonly_accounts.len());
assert_eq!(prog1, non_signed_readonly_accounts[0]);
assert_eq!(prog2, non_signed_readonly_accounts[1]);
let mut cost_model = CostModel::default();
let tx_cost = cost_model.calculate_cost(&tx);
assert_eq!(2 + 2, tx_cost.writable_accounts.len());
assert_eq!(signer1.pubkey(), tx_cost.writable_accounts[0]);
assert_eq!(signer2.pubkey(), tx_cost.writable_accounts[1]);
assert_eq!(key1, tx_cost.writable_accounts[2]);
assert_eq!(key2, tx_cost.writable_accounts[3]);
}
#[test]
@ -448,33 +376,6 @@ mod tests {
assert_eq!(2, tx_cost.writable_accounts.len());
}
#[test]
fn test_cost_model_calculate_cost_no_alloc() {
let (mint_keypair, start_hash) = test_setup();
let tx =
system_transaction::transfer(&mint_keypair, &Keypair::new().pubkey(), 2, start_hash);
let expected_account_cost = SIGNED_WRITABLE_ACCOUNT_ACCESS_COST
+ NON_SIGNED_WRITABLE_ACCOUNT_ACCESS_COST
+ NON_SIGNED_READONLY_ACCOUNT_ACCESS_COST;
let expected_execution_cost = 8;
let mut cost_model = CostModel::default();
cost_model
.upsert_instruction_cost(&system_program::id(), &expected_execution_cost)
.unwrap();
// allocate cost, set some random number
let mut tx_cost = TransactionCost::new_with_capacity(8);
tx_cost.execution_cost = 101;
tx_cost.writable_accounts.push(Pubkey::new_unique());
cost_model.calculate_cost_no_alloc(&tx, &mut tx_cost);
assert_eq!(expected_account_cost, tx_cost.account_access_cost);
assert_eq!(expected_execution_cost, tx_cost.execution_cost);
assert_eq!(2, tx_cost.writable_accounts.len());
}
#[test]
fn test_cost_model_update_instruction_cost() {
let key1 = Pubkey::new_unique();
@ -493,43 +394,6 @@ mod tests {
assert_eq!(updated_cost, cost_model.find_instruction_cost(&key1));
}
#[test]
fn test_cost_model_can_be_shared_concurrently_as_immutable() {
let (mint_keypair, start_hash) = test_setup();
let number_threads = 10;
let expected_account_cost = SIGNED_WRITABLE_ACCOUNT_ACCESS_COST
+ NON_SIGNED_WRITABLE_ACCOUNT_ACCESS_COST
+ NON_SIGNED_READONLY_ACCOUNT_ACCESS_COST;
let cost_model = Arc::new(CostModel::default());
let thread_handlers: Vec<JoinHandle<()>> = (0..number_threads)
.map(|_| {
// each thread creates its own simple transaction
let simple_transaction = system_transaction::transfer(
&mint_keypair,
&Keypair::new().pubkey(),
2,
start_hash,
);
let cost_model = cost_model.clone();
thread::spawn(move || {
let tx_cost = cost_model.calculate_cost(&simple_transaction);
assert_eq!(2, tx_cost.writable_accounts.len());
assert_eq!(expected_account_cost, tx_cost.account_access_cost);
assert_eq!(
cost_model.instruction_execution_cost_table.get_mode(),
tx_cost.execution_cost
);
})
})
.collect();
for th in thread_handlers {
th.join().unwrap();
}
}
#[test]
fn test_cost_model_can_be_shared_concurrently_with_rwlock() {
let (mint_keypair, start_hash) = test_setup();
@ -573,7 +437,8 @@ mod tests {
})
} else {
thread::spawn(move || {
let tx_cost = cost_model.read().unwrap().calculate_cost(&tx);
let mut cost_model = cost_model.write().unwrap();
let tx_cost = cost_model.calculate_cost(&tx);
assert_eq!(3, tx_cost.writable_accounts.len());
assert_eq!(expected_account_cost, tx_cost.account_access_cost);
})

View File

@ -1,14 +1,21 @@
//! `cost_tracker` keeps tracking tranasction cost per chained accounts as well as for entire block
//! The main entry function is 'try_add', if success, it returns new block cost.
//! It aggregates `cost_model`, which provides service of calculating transaction cost.
//! The main functions are:
//! - would_transaction_fit(&tx), immutable function to test if `tx` would fit into current block
//! - add_transaction_cost(&tx), mutable function to accumulate `tx` cost to tracker.
//!
use crate::cost_model::TransactionCost;
use solana_sdk::{clock::Slot, pubkey::Pubkey};
use std::collections::HashMap;
use crate::cost_model::{CostModel, TransactionCost};
use solana_sdk::{clock::Slot, pubkey::Pubkey, transaction::Transaction};
use std::{
collections::HashMap,
sync::{Arc, RwLock},
};
const WRITABLE_ACCOUNTS_PER_BLOCK: usize = 512;
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct CostTracker {
cost_model: Arc<RwLock<CostModel>>,
account_cost_limit: u64,
block_cost_limit: u64,
current_bank_slot: Slot,
@ -17,17 +24,47 @@ pub struct CostTracker {
}
impl CostTracker {
pub fn new(chain_max: u64, package_max: u64) -> Self {
assert!(chain_max <= package_max);
pub fn new(cost_model: Arc<RwLock<CostModel>>) -> Self {
let (account_cost_limit, block_cost_limit) = {
let cost_model = cost_model.read().unwrap();
(
cost_model.get_account_cost_limit(),
cost_model.get_block_cost_limit(),
)
};
assert!(account_cost_limit <= block_cost_limit);
Self {
account_cost_limit: chain_max,
block_cost_limit: package_max,
cost_model,
account_cost_limit,
block_cost_limit,
current_bank_slot: 0,
cost_by_writable_accounts: HashMap::with_capacity(WRITABLE_ACCOUNTS_PER_BLOCK),
block_cost: 0,
}
}
pub fn would_transaction_fit(&self, transaction: &Transaction) -> Result<(), &'static str> {
let mut cost_model = self.cost_model.write().unwrap();
let tx_cost = cost_model.calculate_cost(transaction);
self.would_fit(
&tx_cost.writable_accounts,
&(tx_cost.account_access_cost + tx_cost.execution_cost),
)
}
pub fn add_transaction_cost(&mut self, transaction: &Transaction) {
let mut cost_model = self.cost_model.write().unwrap();
let tx_cost = cost_model.calculate_cost(transaction);
let cost = tx_cost.account_access_cost + tx_cost.execution_cost;
for account_key in tx_cost.writable_accounts.iter() {
*self
.cost_by_writable_accounts
.entry(*account_key)
.or_insert(0) += cost;
}
self.block_cost += cost;
}
pub fn reset_if_new_bank(&mut self, slot: Slot) {
if slot != self.current_bank_slot {
self.current_bank_slot = slot;
@ -36,7 +73,7 @@ impl CostTracker {
}
}
pub fn try_add(&mut self, transaction_cost: TransactionCost) -> Result<u64, &'static str> {
pub fn try_add(&mut self, transaction_cost: &TransactionCost) -> Result<u64, &'static str> {
let cost = transaction_cost.account_access_cost + transaction_cost.execution_cost;
self.would_fit(&transaction_cost.writable_accounts, &cost)?;
@ -44,7 +81,7 @@ impl CostTracker {
Ok(self.block_cost)
}
pub fn would_fit(&self, keys: &[Pubkey], cost: &u64) -> Result<(), &'static str> {
fn would_fit(&self, keys: &[Pubkey], cost: &u64) -> Result<(), &'static str> {
// check against the total package cost
if self.block_cost + cost > self.block_cost_limit {
return Err("would exceed block cost limit");
@ -72,7 +109,7 @@ impl CostTracker {
Ok(())
}
pub fn add_transaction(&mut self, keys: &[Pubkey], cost: &u64) {
fn add_transaction(&mut self, keys: &[Pubkey], cost: &u64) {
for account_key in keys.iter() {
*self
.cost_by_writable_accounts
@ -86,6 +123,7 @@ impl CostTracker {
// CostStats can be collected by util, such as ledger_tool
#[derive(Default, Debug)]
pub struct CostStats {
pub bank_slot: Slot,
pub total_cost: u64,
pub number_of_accounts: usize,
pub costliest_account: Pubkey,
@ -95,6 +133,7 @@ pub struct CostStats {
impl CostTracker {
pub fn get_stats(&self) -> CostStats {
let mut stats = CostStats {
bank_slot: self.current_bank_slot,
total_cost: self.block_cost,
number_of_accounts: self.cost_by_writable_accounts.len(),
costliest_account: Pubkey::default(),
@ -152,7 +191,7 @@ mod tests {
#[test]
fn test_cost_tracker_initialization() {
let testee = CostTracker::new(10, 11);
let testee = CostTracker::new(Arc::new(RwLock::new(CostModel::new(10, 11))));
assert_eq!(10, testee.account_cost_limit);
assert_eq!(11, testee.block_cost_limit);
assert_eq!(0, testee.cost_by_writable_accounts.len());
@ -165,7 +204,7 @@ mod tests {
let (_tx, keys, cost) = build_simple_transaction(&mint_keypair, &start_hash);
// build testee to have capacity for one simple transaction
let mut testee = CostTracker::new(cost, cost);
let mut testee = CostTracker::new(Arc::new(RwLock::new(CostModel::new(cost, cost))));
assert!(testee.would_fit(&keys, &cost).is_ok());
testee.add_transaction(&keys, &cost);
assert_eq!(cost, testee.block_cost);
@ -179,7 +218,10 @@ mod tests {
let (_tx2, keys2, cost2) = build_simple_transaction(&mint_keypair, &start_hash);
// build testee to have capacity for two simple transactions, with same accounts
let mut testee = CostTracker::new(cost1 + cost2, cost1 + cost2);
let mut testee = CostTracker::new(Arc::new(RwLock::new(CostModel::new(
cost1 + cost2,
cost1 + cost2,
))));
{
assert!(testee.would_fit(&keys1, &cost1).is_ok());
testee.add_transaction(&keys1, &cost1);
@ -201,7 +243,10 @@ mod tests {
let (_tx2, keys2, cost2) = build_simple_transaction(&second_account, &start_hash);
// build testee to have capacity for two simple transactions, with same accounts
let mut testee = CostTracker::new(cmp::max(cost1, cost2), cost1 + cost2);
let mut testee = CostTracker::new(Arc::new(RwLock::new(CostModel::new(
cmp::max(cost1, cost2),
cost1 + cost2,
))));
{
assert!(testee.would_fit(&keys1, &cost1).is_ok());
testee.add_transaction(&keys1, &cost1);
@ -222,7 +267,10 @@ mod tests {
let (_tx2, keys2, cost2) = build_simple_transaction(&mint_keypair, &start_hash);
// build testee to have capacity for two simple transactions, but not for same accounts
let mut testee = CostTracker::new(cmp::min(cost1, cost2), cost1 + cost2);
let mut testee = CostTracker::new(Arc::new(RwLock::new(CostModel::new(
cmp::min(cost1, cost2),
cost1 + cost2,
))));
// should have room for first transaction
{
assert!(testee.would_fit(&keys1, &cost1).is_ok());
@ -243,7 +291,10 @@ mod tests {
let (_tx2, keys2, cost2) = build_simple_transaction(&second_account, &start_hash);
// build testee to have capacity for each chain, but not enough room for both transactions
let mut testee = CostTracker::new(cmp::max(cost1, cost2), cost1 + cost2 - 1);
let mut testee = CostTracker::new(Arc::new(RwLock::new(CostModel::new(
cmp::max(cost1, cost2),
cost1 + cost2 - 1,
))));
// should have room for first transaction
{
assert!(testee.would_fit(&keys1, &cost1).is_ok());
@ -263,7 +314,10 @@ mod tests {
let (_tx2, keys2, cost2) = build_simple_transaction(&mint_keypair, &start_hash);
// build testee to have capacity for two simple transactions, but not for same accounts
let mut testee = CostTracker::new(cmp::min(cost1, cost2), cost1 + cost2);
let mut testee = CostTracker::new(Arc::new(RwLock::new(CostModel::new(
cmp::min(cost1, cost2),
cost1 + cost2,
))));
// should have room for first transaction
{
assert!(testee.would_fit(&keys1, &cost1).is_ok());
@ -296,7 +350,10 @@ mod tests {
let account_max = cost * 2;
let block_max = account_max * 3; // for three accts
let mut testee = CostTracker::new(account_max, block_max);
let mut testee = CostTracker::new(Arc::new(RwLock::new(CostModel::new(
account_max,
block_max,
))));
// case 1: a tx writes to 3 accounts, should success, we will have:
// | acct1 | $cost |
@ -309,7 +366,7 @@ mod tests {
account_access_cost: 0,
execution_cost: cost,
};
assert!(testee.try_add(tx_cost).is_ok());
assert!(testee.try_add(&tx_cost).is_ok());
let stat = testee.get_stats();
assert_eq!(cost, stat.total_cost);
assert_eq!(3, stat.number_of_accounts);
@ -327,7 +384,7 @@ mod tests {
account_access_cost: 0,
execution_cost: cost,
};
assert!(testee.try_add(tx_cost).is_ok());
assert!(testee.try_add(&tx_cost).is_ok());
let stat = testee.get_stats();
assert_eq!(cost * 2, stat.total_cost);
assert_eq!(3, stat.number_of_accounts);
@ -347,7 +404,7 @@ mod tests {
account_access_cost: 0,
execution_cost: cost,
};
assert!(testee.try_add(tx_cost).is_err());
assert!(testee.try_add(&tx_cost).is_err());
let stat = testee.get_stats();
assert_eq!(cost * 2, stat.total_cost);
assert_eq!(3, stat.number_of_accounts);

View File

@ -9,6 +9,7 @@ use crate::{
VerifiedVoteSender, VoteTracker,
},
cost_model::CostModel,
cost_tracker::CostTracker,
fetch_stage::FetchStage,
sigverify::TransactionSigVerifier,
sigverify_stage::SigVerifyStage,
@ -105,6 +106,7 @@ impl Tpu {
cluster_confirmed_slot_sender,
);
let cost_tracker = Arc::new(RwLock::new(CostTracker::new(cost_model.clone())));
let banking_stage = BankingStage::new(
cluster_info,
poh_recorder,
@ -112,7 +114,7 @@ impl Tpu {
verified_vote_packets_receiver,
transaction_status_sender,
replay_vote_sender,
cost_model,
cost_tracker,
);
let broadcast_stage = broadcast_type.new_broadcast_stage(

View File

@ -60,7 +60,7 @@ use std::{
path::{Path, PathBuf},
process::{exit, Command, Stdio},
str::FromStr,
sync::Arc,
sync::{Arc, RwLock},
};
mod bigtable;
@ -737,14 +737,15 @@ fn compute_slot_cost(blockstore: &Blockstore, slot: Slot) -> Result<(), String>
let mut transactions = 0;
let mut programs = 0;
let mut program_ids = HashMap::new();
let cost_model = CostModel::new(ACCOUNT_MAX_COST, BLOCK_MAX_COST);
let mut cost_tracker = CostTracker::new(
cost_model.get_account_cost_limit(),
cost_model.get_block_cost_limit(),
);
let cost_model = Arc::new(RwLock::new(CostModel::new(
ACCOUNT_MAX_COST,
BLOCK_MAX_COST,
)));
let mut cost_tracker = CostTracker::new(cost_model.clone());
for entry in &entries {
transactions += entry.transactions.len();
let mut cost_model = cost_model.write().unwrap();
for transaction in &entry.transactions {
programs += transaction.message().instructions.len();
let tx_cost = cost_model.calculate_cost(transaction);