replay stage feed back program cost (#17731)

* replay stage feeds back realtime per-program execution cost to cost model;

* program cost execution table is initialized into empty table, no longer populated with hardcoded numbers;

* changed cost unit to microsecond, using value collected from mainnet;

* add ExecuteCostTable with fixed capacity for security concern, when its limit is reached, programs with old age AND less occurrence will be pushed out to make room for new programs.
This commit is contained in:
Tao Zhu 2021-06-09 17:10:59 -05:00 committed by GitHub
parent 050bb5446d
commit ae27fcbcda
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 542 additions and 174 deletions

View File

@ -4,7 +4,7 @@ use crossbeam_channel::unbounded;
use log::*;
use rand::{thread_rng, Rng};
use rayon::prelude::*;
use solana_core::banking_stage::BankingStage;
use solana_core::{banking_stage::BankingStage, cost_model::CostModel};
use solana_gossip::{cluster_info::ClusterInfo, cluster_info::Node};
use solana_ledger::{
blockstore::Blockstore,
@ -26,7 +26,7 @@ use solana_sdk::{
transaction::Transaction,
};
use std::{
sync::{atomic::Ordering, mpsc::Receiver, Arc, Mutex},
sync::{atomic::Ordering, mpsc::Receiver, Arc, Mutex, RwLock},
thread::sleep,
time::{Duration, Instant},
};
@ -224,6 +224,7 @@ fn main() {
vote_receiver,
None,
replay_vote_sender,
&Arc::new(RwLock::new(CostModel::default())),
);
poh_recorder.lock().unwrap().set_bank(&bank);

View File

@ -34,7 +34,7 @@ use solana_sdk::transaction::Transaction;
use std::collections::VecDeque;
use std::sync::atomic::Ordering;
use std::sync::mpsc::Receiver;
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, Instant};
use test::Bencher;
@ -93,8 +93,8 @@ fn bench_consume_buffered(bencher: &mut Bencher) {
None::<Box<dyn Fn()>>,
&BankingStageStats::default(),
&recorder,
&Arc::new(CostModel::default()),
&Arc::new(Mutex::new(CostTracker::new(std::u32::MAX, std::u32::MAX))),
&Arc::new(RwLock::new(CostModel::default())),
&Arc::new(Mutex::new(CostTracker::new(std::u64::MAX, std::u64::MAX))),
);
});
@ -215,8 +215,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
vote_receiver,
None,
s,
std::u32::MAX,
std::u32::MAX,
&Arc::new(RwLock::new(CostModel::new(std::u64::MAX, std::u64::MAX))),
);
poh_recorder.lock().unwrap().set_bank(&bank);

View File

@ -1,11 +1,7 @@
//! The `banking_stage` processes Transaction messages. It is intended to be used
//! to contruct a software pipeline. The stage uses all available CPU cores and
//! can do its processing in parallel with signature verification on the GPU.
use crate::{
cost_model::{CostModel, ACCOUNT_MAX_COST, BLOCK_MAX_COST},
cost_tracker::CostTracker,
packet_hasher::PacketHasher,
};
use crate::{cost_model::CostModel, cost_tracker::CostTracker, packet_hasher::PacketHasher};
use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError};
use itertools::Itertools;
use lru::LruCache;
@ -55,7 +51,7 @@ use std::{
net::UdpSocket,
ops::DerefMut,
sync::atomic::{AtomicU64, AtomicUsize, Ordering},
sync::{Arc, Mutex},
sync::{Arc, Mutex, RwLock},
thread::{self, Builder, JoinHandle},
time::Duration,
time::Instant,
@ -226,6 +222,7 @@ impl BankingStage {
verified_vote_receiver: CrossbeamReceiver<Vec<Packets>>,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: ReplayVoteSender,
cost_model: &Arc<RwLock<CostModel>>,
) -> Self {
Self::new_with_cost_limit(
cluster_info,
@ -234,8 +231,7 @@ impl BankingStage {
verified_vote_receiver,
transaction_status_sender,
gossip_vote_sender,
ACCOUNT_MAX_COST,
BLOCK_MAX_COST,
cost_model,
)
}
@ -246,15 +242,12 @@ impl BankingStage {
verified_vote_receiver: CrossbeamReceiver<Vec<Packets>>,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: ReplayVoteSender,
account_cost_limit: u32,
block_cost_limit: u32,
cost_model: &Arc<RwLock<CostModel>>,
) -> Self {
// shared immutable 'cost_model' that calcuates transaction costs
// shared mutex guarded 'cost_tracker' tracks bank's cost against configured limits.
let cost_model = Arc::new(CostModel::new(account_cost_limit, block_cost_limit));
let cost_tracker = Arc::new(Mutex::new(CostTracker::new(
cost_model.get_account_cost_limit(),
cost_model.get_block_cost_limit(),
cost_model.read().unwrap().get_account_cost_limit(),
cost_model.read().unwrap().get_block_cost_limit(),
)));
Self::new_num_threads(
cluster_info,
@ -277,7 +270,7 @@ impl BankingStage {
num_threads: u32,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: ReplayVoteSender,
cost_model: &Arc<CostModel>,
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: &Arc<Mutex<CostTracker>>,
) -> Self {
let batch_limit = TOTAL_BUFFERED_PACKETS / ((num_threads - 1) as usize * PACKETS_PER_BATCH);
@ -386,7 +379,7 @@ impl BankingStage {
test_fn: Option<impl Fn()>,
banking_stage_stats: &BankingStageStats,
recorder: &TransactionRecorder,
cost_model: &Arc<CostModel>,
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: &Arc<Mutex<CostTracker>>,
) {
let mut rebuffered_packets_len = 0;
@ -530,7 +523,7 @@ impl BankingStage {
gossip_vote_sender: &ReplayVoteSender,
banking_stage_stats: &BankingStageStats,
recorder: &TransactionRecorder,
cost_model: &Arc<CostModel>,
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: &Arc<Mutex<CostTracker>>,
) -> BufferedPacketsDecision {
let bank_start;
@ -647,7 +640,7 @@ impl BankingStage {
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: ReplayVoteSender,
duplicates: &Arc<Mutex<(LruCache<u64, ()>, PacketHasher)>>,
cost_model: &Arc<CostModel>,
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: &Arc<Mutex<CostTracker>>,
) {
let recorder = poh_recorder.lock().unwrap().recorder();
@ -1044,7 +1037,7 @@ impl BankingStage {
msgs: &Packets,
transaction_indexes: &[usize],
secp256k1_program_enabled: bool,
cost_model: &Arc<CostModel>,
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: &Arc<Mutex<CostTracker>>,
) -> (Vec<HashedTransaction<'static>>, Vec<usize>, Vec<usize>) {
// Making a snapshot of shared cost_tracker by clone(), drop lock immediately.
@ -1062,11 +1055,11 @@ impl BankingStage {
tx.verify_precompiles().ok()?;
}
// Get transaction cost via immutable cost_model; try to add cost to
// Get transaction cost via cost_model; try to add cost to
// local copy of cost_tracker, if suceeded, local copy is updated
// and transaction added to valid list; otherwise, transaction is
// added to retry list. No locking here.
let tx_cost = cost_model.calculate_cost(&tx);
let tx_cost = cost_model.read().unwrap().calculate_cost(&tx);
let result = cost_tracker.try_add(tx_cost);
if result.is_err() {
debug!("transaction {:?} would exceed limit: {:?}", tx, result);
@ -1139,7 +1132,7 @@ impl BankingStage {
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender,
banking_stage_stats: &BankingStageStats,
cost_model: &Arc<CostModel>,
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: &Arc<Mutex<CostTracker>>,
) -> (usize, usize, Vec<usize>) {
let mut packet_conversion_time = Measure::start("packet_conversion");
@ -1177,7 +1170,7 @@ impl BankingStage {
// applying cost of processed transactions to shared cost_tracker
transactions.iter().enumerate().for_each(|(index, tx)| {
if !unprocessed_tx_indexes.iter().any(|&i| i == index) {
let tx_cost = cost_model.calculate_cost(&tx.transaction());
let tx_cost = cost_model.read().unwrap().calculate_cost(&tx.transaction());
let mut guard = cost_tracker.lock().unwrap();
let _result = guard.try_add(tx_cost);
drop(guard);
@ -1221,7 +1214,7 @@ impl BankingStage {
transaction_indexes: &[usize],
my_pubkey: &Pubkey,
next_leader: Option<Pubkey>,
cost_model: &Arc<CostModel>,
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: &Arc<Mutex<CostTracker>>,
) -> Vec<usize> {
// Check if we are the next leader. If so, let's not filter the packets
@ -1293,7 +1286,7 @@ impl BankingStage {
banking_stage_stats: &BankingStageStats,
duplicates: &Arc<Mutex<(LruCache<u64, ()>, PacketHasher)>>,
recorder: &TransactionRecorder,
cost_model: &Arc<CostModel>,
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: &Arc<Mutex<CostTracker>>,
) -> Result<(), RecvTimeoutError> {
let mut recv_time = Measure::start("process_packets_recv");
@ -1515,6 +1508,7 @@ fn next_leader_tpu_forwards(
#[cfg(test)]
mod tests {
use super::*;
use crate::cost_model::{ACCOUNT_MAX_COST, BLOCK_MAX_COST};
use crossbeam_channel::unbounded;
use itertools::Itertools;
use solana_gossip::cluster_info::Node;
@ -1575,6 +1569,7 @@ mod tests {
vote_receiver,
None,
gossip_vote_sender,
&Arc::new(RwLock::new(CostModel::default())),
);
drop(verified_sender);
drop(vote_sender);
@ -1620,6 +1615,7 @@ mod tests {
vote_receiver,
None,
gossip_vote_sender,
&Arc::new(RwLock::new(CostModel::default())),
);
trace!("sending bank");
drop(verified_sender);
@ -1689,6 +1685,7 @@ mod tests {
vote_receiver,
None,
gossip_vote_sender,
&Arc::new(RwLock::new(CostModel::default())),
);
// fund another account so we can send 2 good transactions in a single batch.
@ -1837,7 +1834,7 @@ mod tests {
2,
None,
gossip_vote_sender,
&Arc::new(CostModel::default()),
&Arc::new(RwLock::new(CostModel::default())),
&Arc::new(Mutex::new(CostTracker::new(
ACCOUNT_MAX_COST,
BLOCK_MAX_COST,
@ -2662,7 +2659,7 @@ mod tests {
None::<Box<dyn Fn()>>,
&BankingStageStats::default(),
&recorder,
&Arc::new(CostModel::default()),
&Arc::new(RwLock::new(CostModel::default())),
&Arc::new(Mutex::new(CostTracker::new(
ACCOUNT_MAX_COST,
BLOCK_MAX_COST,
@ -2683,7 +2680,7 @@ mod tests {
None::<Box<dyn Fn()>>,
&BankingStageStats::default(),
&recorder,
&Arc::new(CostModel::default()),
&Arc::new(RwLock::new(CostModel::default())),
&Arc::new(Mutex::new(CostTracker::new(
ACCOUNT_MAX_COST,
BLOCK_MAX_COST,
@ -2753,7 +2750,7 @@ mod tests {
test_fn,
&BankingStageStats::default(),
&recorder,
&Arc::new(CostModel::default()),
&Arc::new(RwLock::new(CostModel::default())),
&Arc::new(Mutex::new(CostTracker::new(
ACCOUNT_MAX_COST,
BLOCK_MAX_COST,

View File

@ -7,33 +7,21 @@
//! is transaction's "execution cost"
//! The main function is `calculate_cost` which returns a TransactionCost struct.
//!
use crate::execute_cost_table::ExecuteCostTable;
use log::*;
use solana_sdk::{
bpf_loader, bpf_loader_deprecated, bpf_loader_upgradeable, feature, incinerator,
message::Message, native_loader, pubkey::Pubkey, secp256k1_program, system_program,
transaction::Transaction,
};
use solana_sdk::{message::Message, pubkey::Pubkey, transaction::Transaction};
use std::collections::HashMap;
// from mainnet-beta data, taking `vote program` as 1 COST_UNIT to load and execute
// amount all type programs, the costs are:
// min: 0.9 COST_UNIT
// max: 110 COST UNIT
// Median: 12 COST_UNIT
// Average: 19 COST_UNIT
const COST_UNIT: u32 = 1;
const DEFAULT_PROGRAM_COST: u32 = COST_UNIT * 100;
// re-adjust these numbers if needed
const SIGNED_WRITABLE_ACCOUNT_ACCESS_COST: u32 = COST_UNIT * 10;
const SIGNED_READONLY_ACCOUNT_ACCESS_COST: u32 = COST_UNIT * 2;
const NON_SIGNED_WRITABLE_ACCOUNT_ACCESS_COST: u32 = COST_UNIT * 5;
const NON_SIGNED_READONLY_ACCOUNT_ACCESS_COST: u32 = COST_UNIT;
// running 'ledger-tool compute-cost' over mainnet ledger, the largest block cost
// is 575_687, and the largest chain cost (eg account cost) is 559_000
// Configuring cost model to have larger block limit and smaller account limit
// to encourage packing parallelizable transactions in block.
pub const ACCOUNT_MAX_COST: u32 = COST_UNIT * 10_000;
pub const BLOCK_MAX_COST: u32 = COST_UNIT * 10_000_000;
// Guestimated from mainnet-beta data, sigver averages 1us, read averages 7us and write avergae 25us
const SIGNED_WRITABLE_ACCOUNT_ACCESS_COST: u64 = 1 + 25;
const SIGNED_READONLY_ACCOUNT_ACCESS_COST: u64 = 1 + 7;
const NON_SIGNED_WRITABLE_ACCOUNT_ACCESS_COST: u64 = 25;
const NON_SIGNED_READONLY_ACCOUNT_ACCESS_COST: u64 = 7;
// Sampled from mainnet-beta, the instruction execution timings stats are (in us):
// min=194, max=62164, avg=8214.49, med=2243
pub const ACCOUNT_MAX_COST: u64 = 100_000_000;
pub const BLOCK_MAX_COST: u64 = 2_500_000_000;
// cost of transaction is made of account_access_cost and instruction execution_cost
// where
@ -44,52 +32,15 @@ pub const BLOCK_MAX_COST: u32 = COST_UNIT * 10_000_000;
#[derive(Default, Debug)]
pub struct TransactionCost {
pub writable_accounts: Vec<Pubkey>,
pub account_access_cost: u32,
pub execution_cost: u32,
}
// instruction execution code table is initialized with default values, and
// updated with realtime information (by Replay)
#[derive(Debug)]
struct InstructionExecutionCostTable {
pub table: HashMap<Pubkey, u32>,
}
macro_rules! costmetrics {
($( $key: expr => $val: expr ),*) => {{
let mut hashmap: HashMap< Pubkey, u32 > = HashMap::new();
$( hashmap.insert( $key, $val); )*
hashmap
}}
}
impl InstructionExecutionCostTable {
// build cost table with default value
pub fn new() -> Self {
Self {
table: costmetrics![
solana_config_program::id() => COST_UNIT,
feature::id() => COST_UNIT * 2,
incinerator::id() => COST_UNIT * 2,
native_loader::id() => COST_UNIT * 2,
solana_stake_program::id() => COST_UNIT * 2,
solana_stake_program::config::id() => COST_UNIT,
solana_vote_program::id() => COST_UNIT,
secp256k1_program::id() => COST_UNIT,
system_program::id() => COST_UNIT * 8,
bpf_loader::id() => COST_UNIT * 500,
bpf_loader_deprecated::id() => COST_UNIT * 500,
bpf_loader_upgradeable::id() => COST_UNIT * 500
],
}
}
pub account_access_cost: u64,
pub execution_cost: u64,
}
#[derive(Debug)]
pub struct CostModel {
account_cost_limit: u32,
block_cost_limit: u32,
instruction_execution_cost_table: InstructionExecutionCostTable,
account_cost_limit: u64,
block_cost_limit: u64,
instruction_execution_cost_table: ExecuteCostTable,
}
impl Default for CostModel {
@ -99,19 +50,19 @@ impl Default for CostModel {
}
impl CostModel {
pub fn new(chain_max: u32, block_max: u32) -> Self {
pub fn new(chain_max: u64, block_max: u64) -> Self {
Self {
account_cost_limit: chain_max,
block_cost_limit: block_max,
instruction_execution_cost_table: InstructionExecutionCostTable::new(),
instruction_execution_cost_table: ExecuteCostTable::default(),
}
}
pub fn get_account_cost_limit(&self) -> u32 {
pub fn get_account_cost_limit(&self) -> u64 {
self.account_cost_limit
}
pub fn get_block_cost_limit(&self) -> u32 {
pub fn get_block_cost_limit(&self) -> u64 {
self.block_cost_limit
}
@ -140,40 +91,39 @@ impl CostModel {
}
// To update or insert instruction cost to table.
// When updating, uses the average of new and old values to smooth out outliers
pub fn upsert_instruction_cost(
&mut self,
program_key: &Pubkey,
cost: &u32,
) -> Result<u32, &'static str> {
let instruction_cost = self
.instruction_execution_cost_table
.table
.entry(*program_key)
.or_insert(*cost);
*instruction_cost = (*instruction_cost + *cost) / 2;
Ok(*instruction_cost)
cost: &u64,
) -> Result<u64, &'static str> {
self.instruction_execution_cost_table
.upsert(program_key, cost);
match self.instruction_execution_cost_table.get_cost(program_key) {
Some(cost) => Ok(*cost),
None => Err("failed to upsert to ExecuteCostTable"),
}
}
fn find_instruction_cost(&self, program_key: &Pubkey) -> u32 {
match self
.instruction_execution_cost_table
.table
.get(&program_key)
{
pub fn get_instruction_cost_table(&self) -> &HashMap<Pubkey, u64> {
self.instruction_execution_cost_table.get_cost_table()
}
fn find_instruction_cost(&self, program_key: &Pubkey) -> u64 {
match self.instruction_execution_cost_table.get_cost(&program_key) {
Some(cost) => *cost,
None => {
let default_value = self.instruction_execution_cost_table.get_mode();
debug!(
"Program key {:?} does not have assigned cost, using default {}",
program_key, DEFAULT_PROGRAM_COST
"Program key {:?} does not have assigned cost, using mode {}",
program_key, default_value
);
DEFAULT_PROGRAM_COST
default_value
}
}
}
fn find_transaction_cost(&self, transaction: &Transaction) -> u32 {
let mut cost: u32 = 0;
fn find_transaction_cost(&self, transaction: &Transaction) -> u64 {
let mut cost: u64 = 0;
for instruction in &transaction.message().instructions {
let program_id =
@ -194,12 +144,12 @@ impl CostModel {
signed_readonly_accounts: &[Pubkey],
non_signed_writable_accounts: &[Pubkey],
non_signed_readonly_accounts: &[Pubkey],
) -> u32 {
) -> u64 {
let mut cost = 0;
cost += signed_writable_accounts.len() as u32 * SIGNED_WRITABLE_ACCOUNT_ACCESS_COST;
cost += signed_readonly_accounts.len() as u32 * SIGNED_READONLY_ACCOUNT_ACCESS_COST;
cost += non_signed_writable_accounts.len() as u32 * NON_SIGNED_WRITABLE_ACCOUNT_ACCESS_COST;
cost += non_signed_readonly_accounts.len() as u32 * NON_SIGNED_READONLY_ACCOUNT_ACCESS_COST;
cost += signed_writable_accounts.len() as u64 * SIGNED_WRITABLE_ACCOUNT_ACCESS_COST;
cost += signed_readonly_accounts.len() as u64 * SIGNED_READONLY_ACCOUNT_ACCESS_COST;
cost += non_signed_writable_accounts.len() as u64 * NON_SIGNED_WRITABLE_ACCOUNT_ACCESS_COST;
cost += non_signed_readonly_accounts.len() as u64 * NON_SIGNED_READONLY_ACCOUNT_ACCESS_COST;
cost
}
@ -242,12 +192,13 @@ mod tests {
genesis_utils::{create_genesis_config, GenesisConfigInfo},
};
use solana_sdk::{
bpf_loader,
hash::Hash,
instruction::CompiledInstruction,
message::Message,
signature::{Keypair, Signer},
system_instruction::{self},
system_transaction,
system_program, system_transaction,
};
use std::{
str::FromStr,
@ -269,23 +220,21 @@ mod tests {
#[test]
fn test_cost_model_instruction_cost() {
let testee = CostModel::default();
let mut testee = CostModel::default();
let known_key = Pubkey::from_str("known11111111111111111111111111111111111111").unwrap();
testee.upsert_instruction_cost(&known_key, &100).unwrap();
// find cost for known programs
assert_eq!(
COST_UNIT,
testee.find_instruction_cost(
&Pubkey::from_str("Vote111111111111111111111111111111111111111").unwrap()
)
);
assert_eq!(
COST_UNIT * 500,
testee.find_instruction_cost(&bpf_loader::id())
);
assert_eq!(100, testee.find_instruction_cost(&known_key));
testee
.upsert_instruction_cost(&bpf_loader::id(), &1999)
.unwrap();
assert_eq!(1999, testee.find_instruction_cost(&bpf_loader::id()));
// unknown program is assigned with default cost
assert_eq!(
DEFAULT_PROGRAM_COST,
testee.instruction_execution_cost_table.get_mode(),
testee.find_instruction_cost(
&Pubkey::from_str("unknown111111111111111111111111111111111111").unwrap()
)
@ -305,9 +254,12 @@ mod tests {
);
// expected cost for one system transfer instructions
let expected_cost = COST_UNIT * 8;
let expected_cost = 8;
let testee = CostModel::default();
let mut testee = CostModel::default();
testee
.upsert_instruction_cost(&system_program::id(), &expected_cost)
.unwrap();
assert_eq!(
expected_cost,
testee.find_transaction_cost(&simple_transaction)
@ -327,9 +279,13 @@ mod tests {
debug!("many transfer transaction {:?}", tx);
// expected cost for two system transfer instructions
let expected_cost = COST_UNIT * 8 * 2;
let program_cost = 8;
let expected_cost = program_cost * 2;
let testee = CostModel::default();
let mut testee = CostModel::default();
testee
.upsert_instruction_cost(&system_program::id(), &program_cost)
.unwrap();
assert_eq!(expected_cost, testee.find_transaction_cost(&tx));
}
@ -355,11 +311,12 @@ mod tests {
);
debug!("many random transaction {:?}", tx);
// expected cost for two random/unknown program is
let expected_cost = DEFAULT_PROGRAM_COST * 2;
let testee = CostModel::default();
assert_eq!(expected_cost, testee.find_transaction_cost(&tx));
let result = testee.find_transaction_cost(&tx);
// expected cost for two random/unknown program is
let expected_cost = testee.instruction_execution_cost_table.get_mode() * 2;
assert_eq!(expected_cost, result);
}
#[test]
@ -411,7 +368,7 @@ mod tests {
let mut cost_model = CostModel::default();
// Using default cost for unknown instruction
assert_eq!(
DEFAULT_PROGRAM_COST,
cost_model.instruction_execution_cost_table.get_mode(),
cost_model.find_instruction_cost(&key1)
);
@ -431,9 +388,12 @@ mod tests {
let expected_account_cost = SIGNED_WRITABLE_ACCOUNT_ACCESS_COST
+ NON_SIGNED_WRITABLE_ACCOUNT_ACCESS_COST
+ NON_SIGNED_READONLY_ACCOUNT_ACCESS_COST;
let expected_execution_cost = COST_UNIT * 8;
let expected_execution_cost = 8;
let cost_model = CostModel::default();
let mut cost_model = CostModel::default();
cost_model
.upsert_instruction_cost(&system_program::id(), &expected_execution_cost)
.unwrap();
let tx_cost = cost_model.calculate_cost(&tx);
assert_eq!(expected_account_cost, tx_cost.account_access_cost);
assert_eq!(expected_execution_cost, tx_cost.execution_cost);
@ -465,7 +425,6 @@ mod tests {
let expected_account_cost = SIGNED_WRITABLE_ACCOUNT_ACCESS_COST
+ NON_SIGNED_WRITABLE_ACCOUNT_ACCESS_COST
+ NON_SIGNED_READONLY_ACCOUNT_ACCESS_COST;
let expected_execution_cost = COST_UNIT * 8;
let cost_model = Arc::new(CostModel::default());
@ -483,7 +442,10 @@ mod tests {
let tx_cost = cost_model.calculate_cost(&simple_transaction);
assert_eq!(2, tx_cost.writable_accounts.len());
assert_eq!(expected_account_cost, tx_cost.account_access_cost);
assert_eq!(expected_execution_cost, tx_cost.execution_cost);
assert_eq!(
cost_model.instruction_execution_cost_table.get_mode(),
tx_cost.execution_cost
);
})
})
.collect();
@ -520,7 +482,6 @@ mod tests {
let cost1 = 100;
let cost2 = 200;
// execution cost can be either 2 * Default (before write) or cost1+cost2 (after write)
let expected_execution_cost = Arc::new(vec![cost1 + cost2, DEFAULT_PROGRAM_COST * 2]);
let cost_model: Arc<RwLock<CostModel>> = Arc::new(RwLock::new(CostModel::default()));
@ -528,7 +489,6 @@ mod tests {
.map(|i| {
let cost_model = cost_model.clone();
let tx = tx.clone();
let expected_execution_cost = expected_execution_cost.clone();
if i == 5 {
thread::spawn(move || {
@ -541,7 +501,6 @@ mod tests {
let tx_cost = cost_model.read().unwrap().calculate_cost(&tx);
assert_eq!(3, tx_cost.writable_accounts.len());
assert_eq!(expected_account_cost, tx_cost.account_access_cost);
assert!(expected_execution_cost.contains(&tx_cost.execution_cost));
})
}
})

View File

@ -7,15 +7,15 @@ use std::collections::HashMap;
#[derive(Debug, Clone)]
pub struct CostTracker {
account_cost_limit: u32,
block_cost_limit: u32,
account_cost_limit: u64,
block_cost_limit: u64,
current_bank_slot: Slot,
cost_by_writable_accounts: HashMap<Pubkey, u32>,
block_cost: u32,
cost_by_writable_accounts: HashMap<Pubkey, u64>,
block_cost: u64,
}
impl CostTracker {
pub fn new(chain_max: u32, package_max: u32) -> Self {
pub fn new(chain_max: u64, package_max: u64) -> Self {
assert!(chain_max <= package_max);
Self {
account_cost_limit: chain_max,
@ -34,7 +34,7 @@ impl CostTracker {
}
}
pub fn try_add(&mut self, transaction_cost: TransactionCost) -> Result<u32, &'static str> {
pub fn try_add(&mut self, transaction_cost: TransactionCost) -> Result<u64, &'static str> {
let cost = transaction_cost.account_access_cost + transaction_cost.execution_cost;
self.would_fit(&transaction_cost.writable_accounts, &cost)?;
@ -42,7 +42,7 @@ impl CostTracker {
Ok(self.block_cost)
}
fn would_fit(&self, keys: &[Pubkey], cost: &u32) -> Result<(), &'static str> {
fn would_fit(&self, keys: &[Pubkey], cost: &u64) -> Result<(), &'static str> {
// check against the total package cost
if self.block_cost + cost > self.block_cost_limit {
return Err("would exceed block cost limit");
@ -70,7 +70,7 @@ impl CostTracker {
Ok(())
}
fn add_transaction(&mut self, keys: &[Pubkey], cost: &u32) {
fn add_transaction(&mut self, keys: &[Pubkey], cost: &u64) {
for account_key in keys.iter() {
*self
.cost_by_writable_accounts
@ -84,10 +84,10 @@ impl CostTracker {
// CostStats can be collected by util, such as ledger_tool
#[derive(Default, Debug)]
pub struct CostStats {
pub total_cost: u32,
pub total_cost: u64,
pub number_of_accounts: usize,
pub costliest_account: Pubkey,
pub costliest_account_cost: u32,
pub costliest_account_cost: u64,
}
impl CostTracker {
@ -140,7 +140,7 @@ mod tests {
fn build_simple_transaction(
mint_keypair: &Keypair,
start_hash: &Hash,
) -> (Transaction, Vec<Pubkey>, u32) {
) -> (Transaction, Vec<Pubkey>, u64) {
let keypair = Keypair::new();
let simple_transaction =
system_transaction::transfer(&mint_keypair, &keypair.pubkey(), 2, *start_hash);

View File

@ -0,0 +1,277 @@
/// ExecuteCostTable is aggregated by Cost Model, it keeps each program's
/// average cost in its HashMap, with fixed capacity to avoid from growing
/// unchecked.
/// When its capacity limit is reached, it prunes old and less-used programs
/// to make room for new ones.
use log::*;
use solana_sdk::pubkey::Pubkey;
use std::{collections::HashMap, time::SystemTime};
// prune is rather expensive op, free up bulk space in each operation
// would be more efficient. PRUNE_RATIO defines the after prune table
// size will be original_size * PRUNE_RATIO.
const PRUNE_RATIO: f64 = 0.75;
// with 50_000 TPS as norm, weights occurrences '100' per microsec
const OCCURRENCES_WEIGHT: i64 = 100;
const DEFAULT_CAPACITY: usize = 1024;
#[derive(Debug)]
pub struct ExecuteCostTable {
capacity: usize,
table: HashMap<Pubkey, u64>,
occurrences: HashMap<Pubkey, (usize, SystemTime)>,
}
impl Default for ExecuteCostTable {
fn default() -> Self {
ExecuteCostTable::new(DEFAULT_CAPACITY)
}
}
impl ExecuteCostTable {
pub fn new(cap: usize) -> Self {
Self {
capacity: cap,
table: HashMap::new(),
occurrences: HashMap::new(),
}
}
pub fn get_cost_table(&self) -> &HashMap<Pubkey, u64> {
&self.table
}
pub fn get_count(&self) -> usize {
self.table.len()
}
// instead of assigning unknown program with a configured/hard-coded cost
// use average or mode function to make a educated guess.
pub fn get_average(&self) -> u64 {
if self.table.is_empty() {
0
} else {
self.table.iter().map(|(_, value)| value).sum::<u64>() / self.get_count() as u64
}
}
pub fn get_mode(&self) -> u64 {
if self.occurrences.is_empty() {
0
} else {
let key = self
.occurrences
.iter()
.max_by_key(|&(_, count)| count)
.map(|(key, _)| key)
.expect("cannot find mode from cost table");
*self.table.get(&key).unwrap()
}
}
// returns None if program doesn't exist in table. In this case,
// client is advised to call `get_average()` or `get_mode()` to
// assign a 'default' value for new program.
pub fn get_cost(&self, key: &Pubkey) -> Option<&u64> {
self.table.get(&key)
}
pub fn upsert(&mut self, key: &Pubkey, value: &u64) {
let need_to_add = self.table.get(&key).is_none();
let current_size = self.get_count();
if current_size == self.capacity && need_to_add {
self.prune_to(&((current_size as f64 * PRUNE_RATIO) as usize));
}
let program_cost = self.table.entry(*key).or_insert(*value);
*program_cost = (*program_cost + *value) / 2;
let (count, timestamp) = self
.occurrences
.entry(*key)
.or_insert((0, SystemTime::now()));
*count += 1;
*timestamp = SystemTime::now();
}
// prune the old programs so the table contains `new_size` of records,
// where `old` is defined as weighted age, which is negatively correlated
// with program's age and
// positively correlated with how frequently the program
// is executed (eg. occurrence),
fn prune_to(&mut self, new_size: &usize) {
debug!(
"prune cost table, current size {}, new size {}",
self.get_count(),
new_size
);
if *new_size == self.get_count() {
return;
}
if *new_size == 0 {
self.table.clear();
self.occurrences.clear();
return;
}
let now = SystemTime::now();
let mut sorted_by_weighted_age: Vec<_> = self
.occurrences
.iter()
.map(|(key, (count, timestamp))| {
let age = now.duration_since(*timestamp).unwrap().as_micros();
let weighted_age = *count as i64 * OCCURRENCES_WEIGHT + -(age as i64);
(weighted_age, *key)
})
.collect();
sorted_by_weighted_age.sort_by(|x, y| x.0.partial_cmp(&y.0).unwrap());
for i in sorted_by_weighted_age.iter() {
self.table.remove(&i.1);
self.occurrences.remove(&i.1);
if *new_size == self.get_count() {
break;
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_execute_cost_table_prune_simple_table() {
solana_logger::setup();
let capacity: usize = 3;
let mut testee = ExecuteCostTable::new(capacity);
let key1 = Pubkey::new_unique();
let key2 = Pubkey::new_unique();
let key3 = Pubkey::new_unique();
testee.upsert(&key1, &1);
testee.upsert(&key2, &2);
testee.upsert(&key3, &3);
testee.prune_to(&(capacity - 1));
// the oldest, key1, should be pruned
assert!(testee.get_cost(&key1).is_none());
assert!(testee.get_cost(&key2).is_some());
assert!(testee.get_cost(&key2).is_some());
}
#[test]
fn test_execute_cost_table_prune_weighted_table() {
solana_logger::setup();
let capacity: usize = 3;
let mut testee = ExecuteCostTable::new(capacity);
let key1 = Pubkey::new_unique();
let key2 = Pubkey::new_unique();
let key3 = Pubkey::new_unique();
testee.upsert(&key1, &1);
testee.upsert(&key1, &1);
testee.upsert(&key2, &2);
testee.upsert(&key3, &3);
testee.prune_to(&(capacity - 1));
// the oldest, key1, has 2 counts; 2nd oldest Key2 has 1 count;
// expect key2 to be pruned.
assert!(testee.get_cost(&key1).is_some());
assert!(testee.get_cost(&key2).is_none());
assert!(testee.get_cost(&key3).is_some());
}
#[test]
fn test_execute_cost_table_upsert_within_capacity() {
solana_logger::setup();
let mut testee = ExecuteCostTable::default();
let key1 = Pubkey::new_unique();
let key2 = Pubkey::new_unique();
let cost1: u64 = 100;
let cost2: u64 = 110;
// query empty table
assert!(testee.get_cost(&key1).is_none());
// insert one record
testee.upsert(&key1, &cost1);
assert_eq!(1, testee.get_count());
assert_eq!(cost1, testee.get_average());
assert_eq!(cost1, testee.get_mode());
assert_eq!(&cost1, testee.get_cost(&key1).unwrap());
// insert 2nd record
testee.upsert(&key2, &cost2);
assert_eq!(2, testee.get_count());
assert_eq!((cost1 + cost2) / 2_u64, testee.get_average());
assert_eq!(cost2, testee.get_mode());
assert_eq!(&cost1, testee.get_cost(&key1).unwrap());
assert_eq!(&cost2, testee.get_cost(&key2).unwrap());
// update 1st record
testee.upsert(&key1, &cost2);
assert_eq!(2, testee.get_count());
assert_eq!(((cost1 + cost2) / 2 + cost2) / 2, testee.get_average());
assert_eq!((cost1 + cost2) / 2, testee.get_mode());
assert_eq!(&((cost1 + cost2) / 2), testee.get_cost(&key1).unwrap());
assert_eq!(&cost2, testee.get_cost(&key2).unwrap());
}
#[test]
fn test_execute_cost_table_upsert_exceeds_capacity() {
solana_logger::setup();
let capacity: usize = 2;
let mut testee = ExecuteCostTable::new(capacity);
let key1 = Pubkey::new_unique();
let key2 = Pubkey::new_unique();
let key3 = Pubkey::new_unique();
let key4 = Pubkey::new_unique();
let cost1: u64 = 100;
let cost2: u64 = 110;
let cost3: u64 = 120;
let cost4: u64 = 130;
// insert one record
testee.upsert(&key1, &cost1);
assert_eq!(1, testee.get_count());
assert_eq!(&cost1, testee.get_cost(&key1).unwrap());
// insert 2nd record
testee.upsert(&key2, &cost2);
assert_eq!(2, testee.get_count());
assert_eq!(&cost1, testee.get_cost(&key1).unwrap());
assert_eq!(&cost2, testee.get_cost(&key2).unwrap());
// insert 3rd record, pushes out the oldest (eg 1st) record
testee.upsert(&key3, &cost3);
assert_eq!(2, testee.get_count());
assert_eq!((cost2 + cost3) / 2_u64, testee.get_average());
assert_eq!(cost3, testee.get_mode());
assert!(testee.get_cost(&key1).is_none());
assert_eq!(&cost2, testee.get_cost(&key2).unwrap());
assert_eq!(&cost3, testee.get_cost(&key3).unwrap());
// update 2nd record, so the 3rd becomes the oldest
// add 4th record, pushes out 3rd key
testee.upsert(&key2, &cost1);
testee.upsert(&key4, &cost4);
assert_eq!(((cost1 + cost2) / 2 + cost4) / 2_u64, testee.get_average());
assert_eq!((cost1 + cost2) / 2, testee.get_mode());
assert_eq!(2, testee.get_count());
assert!(testee.get_cost(&key1).is_none());
assert_eq!(&((cost1 + cost2) / 2), testee.get_cost(&key2).unwrap());
assert!(testee.get_cost(&key3).is_none());
assert_eq!(&cost4, testee.get_cost(&key4).unwrap());
}
}

View File

@ -20,6 +20,7 @@ pub mod completed_data_sets_service;
pub mod consensus;
pub mod cost_model;
pub mod cost_tracker;
pub mod execute_cost_table;
pub mod fetch_stage;
pub mod fork_choice;
pub mod gen_keys;

View File

@ -13,6 +13,7 @@ use crate::{
consensus::{
ComputedBankState, Stake, SwitchForkDecision, Tower, VotedStakes, SWITCH_FORK_THRESHOLD,
},
cost_model::CostModel,
fork_choice::{ForkChoice, SelectVoteAndResetForkResult},
heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice,
latest_validator_votes_for_frozen_banks::LatestValidatorVotesForFrozenBanks,
@ -40,8 +41,8 @@ use solana_rpc::{
rpc_subscriptions::RpcSubscriptions,
};
use solana_runtime::{
accounts_background_service::AbsRequestSender, bank::Bank, bank_forks::BankForks,
commitment::BlockCommitmentCache, vote_sender_types::ReplayVoteSender,
accounts_background_service::AbsRequestSender, bank::Bank, bank::ExecuteTimings,
bank_forks::BankForks, commitment::BlockCommitmentCache, vote_sender_types::ReplayVoteSender,
};
use solana_sdk::{
clock::{Slot, MAX_PROCESSING_AGE, NUM_CONSECUTIVE_LEADER_SLOTS},
@ -294,6 +295,7 @@ impl ReplayStage {
gossip_duplicate_confirmed_slots_receiver: GossipDuplicateConfirmedSlotsReceiver,
gossip_verified_vote_hash_receiver: GossipVerifiedVoteHashReceiver,
cluster_slots_update_sender: ClusterSlotsUpdateSender,
cost_model: Arc<RwLock<CostModel>>,
) -> Self {
let ReplayStageConfig {
my_pubkey,
@ -390,6 +392,7 @@ impl ReplayStage {
&mut unfrozen_gossip_verified_vote_hashes,
&mut latest_validator_votes_for_frozen_banks,
&cluster_slots_update_sender,
&cost_model,
);
replay_active_banks_time.stop();
@ -1663,6 +1666,7 @@ impl ReplayStage {
unfrozen_gossip_verified_vote_hashes: &mut UnfrozenGossipVerifiedVoteHashes,
latest_validator_votes_for_frozen_banks: &mut LatestValidatorVotesForFrozenBanks,
cluster_slots_update_sender: &ClusterSlotsUpdateSender,
cost_model: &RwLock<CostModel>,
) -> bool {
let mut did_complete_bank = false;
let mut tx_count = 0;
@ -1719,6 +1723,11 @@ impl ReplayStage {
replay_vote_sender,
verify_recyclers,
);
Self::update_cost_model(&cost_model, &bank_progress.replay_stats.execute_timings);
debug!(
"after replayed into bank, updated cost model instruction cost table, current values: {:?}",
cost_model.read().unwrap().get_instruction_cost_table()
);
match replay_result {
Ok(replay_tx_count) => tx_count += replay_tx_count,
Err(err) => {
@ -1908,6 +1917,32 @@ impl ReplayStage {
new_stats
}
fn update_cost_model(cost_model: &RwLock<CostModel>, execute_timings: &ExecuteTimings) {
let mut cost_model_mutable = cost_model.write().unwrap();
for (program_id, stats) in &execute_timings.details.per_program_timings {
let cost = stats.0 / stats.1 as u64;
match cost_model_mutable.upsert_instruction_cost(&program_id, &cost) {
Ok(c) => {
debug!(
"after replayed into bank, instruction {:?} has averaged cost {}",
program_id, c
);
}
Err(err) => {
debug!(
"after replayed into bank, instruction {:?} failed to update cost, err: {}",
program_id, err
);
}
}
}
drop(cost_model_mutable);
debug!(
"after replayed into bank, updated cost model instruction cost table, current values: {:?}",
cost_model.read().unwrap().get_instruction_cost_table()
);
}
fn update_propagation_status(
progress: &mut ProgressMap,
slot: Slot,
@ -4910,6 +4945,90 @@ mod tests {
assert_eq!(tower.last_voted_slot().unwrap(), 1);
}
#[test]
fn test_update_cost_model_with_empty_execute_timings() {
let cost_model = Arc::new(RwLock::new(CostModel::default()));
let empty_execute_timings = ExecuteTimings::default();
ReplayStage::update_cost_model(&cost_model, &empty_execute_timings);
assert_eq!(
0,
cost_model
.read()
.unwrap()
.get_instruction_cost_table()
.len()
);
}
#[test]
fn test_update_cost_model_with_execute_timings() {
let cost_model = Arc::new(RwLock::new(CostModel::default()));
let mut execute_timings = ExecuteTimings::default();
let program_key_1 = Pubkey::new_unique();
let mut expected_cost: u64;
// add new program
{
let accumulated_us: u64 = 1000;
let count: u32 = 10;
expected_cost = accumulated_us / count as u64;
execute_timings
.details
.per_program_timings
.insert(program_key_1, (accumulated_us, count));
ReplayStage::update_cost_model(&cost_model, &execute_timings);
assert_eq!(
1,
cost_model
.read()
.unwrap()
.get_instruction_cost_table()
.len()
);
assert_eq!(
Some(&expected_cost),
cost_model
.read()
.unwrap()
.get_instruction_cost_table()
.get(&program_key_1)
);
}
// update program
{
let accumulated_us: u64 = 2000;
let count: u32 = 10;
// to expect new cost is Average(new_value, existing_value)
expected_cost = ((accumulated_us / count as u64) + expected_cost) / 2;
execute_timings
.details
.per_program_timings
.insert(program_key_1, (accumulated_us, count));
ReplayStage::update_cost_model(&cost_model, &execute_timings);
assert_eq!(
1,
cost_model
.read()
.unwrap()
.get_instruction_cost_table()
.len()
);
assert_eq!(
Some(&expected_cost),
cost_model
.read()
.unwrap()
.get_instruction_cost_table()
.get(&program_key_1)
);
}
}
fn run_compute_and_select_forks(
bank_forks: &RwLock<BankForks>,
progress: &mut ProgressMap,

View File

@ -8,6 +8,7 @@ use crate::{
ClusterInfoVoteListener, GossipDuplicateConfirmedSlotsSender, GossipVerifiedVoteHashSender,
VerifiedVoteSender, VoteTracker,
},
cost_model::CostModel,
fetch_stage::FetchStage,
sigverify::TransactionSigVerifier,
sigverify_stage::SigVerifyStage,
@ -69,6 +70,7 @@ impl Tpu {
bank_notification_sender: Option<BankNotificationSender>,
tpu_coalesce_ms: u64,
cluster_confirmed_slot_sender: GossipDuplicateConfirmedSlotsSender,
cost_model: &Arc<RwLock<CostModel>>,
) -> Self {
let (packet_sender, packet_receiver) = channel();
let fetch_stage = FetchStage::new_with_sender(
@ -110,6 +112,7 @@ impl Tpu {
verified_vote_packets_receiver,
transaction_status_sender,
replay_vote_sender,
cost_model,
);
let broadcast_stage = broadcast_type.new_broadcast_stage(

View File

@ -12,6 +12,7 @@ use crate::{
cluster_slots::ClusterSlots,
completed_data_sets_service::CompletedDataSetsSender,
consensus::Tower,
cost_model::CostModel,
ledger_cleanup_service::LedgerCleanupService,
replay_stage::{ReplayStage, ReplayStageConfig},
retransmit_stage::RetransmitStage,
@ -128,6 +129,7 @@ impl Tvu {
gossip_confirmed_slots_receiver: GossipDuplicateConfirmedSlotsReceiver,
tvu_config: TvuConfig,
max_slots: &Arc<MaxSlots>,
cost_model: &Arc<RwLock<CostModel>>,
) -> Self {
let keypair: Arc<Keypair> = cluster_info.keypair.clone();
@ -291,6 +293,7 @@ impl Tvu {
gossip_confirmed_slots_receiver,
gossip_verified_vote_hash_receiver,
cluster_slots_update_sender,
cost_model.clone(),
);
let ledger_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| {
@ -437,6 +440,7 @@ pub mod tests {
gossip_confirmed_slots_receiver,
TvuConfig::default(),
&Arc::new(MaxSlots::default()),
&Arc::new(RwLock::new(CostModel::default())),
);
exit.store(true, Ordering::Relaxed);
tvu.join().unwrap();

View File

@ -6,6 +6,7 @@ use crate::{
cluster_info_vote_listener::VoteTracker,
completed_data_sets_service::CompletedDataSetsService,
consensus::{reconcile_blockstore_roots_with_tower, Tower},
cost_model::{CostModel, ACCOUNT_MAX_COST, BLOCK_MAX_COST},
rewards_recorder_service::{RewardsRecorderSender, RewardsRecorderService},
sample_performance_service::SamplePerformanceService,
serve_repair::ServeRepair,
@ -650,6 +651,11 @@ impl Validator {
bank_forks.read().unwrap().root_bank().deref(),
));
let cost_model = Arc::new(RwLock::new(CostModel::new(
ACCOUNT_MAX_COST,
BLOCK_MAX_COST,
)));
let (retransmit_slots_sender, retransmit_slots_receiver) = unbounded();
let (verified_vote_sender, verified_vote_receiver) = unbounded();
let (gossip_verified_vote_hash_sender, gossip_verified_vote_hash_receiver) = unbounded();
@ -722,6 +728,7 @@ impl Validator {
wait_for_vote_to_start_leader,
},
&max_slots,
&cost_model,
);
let tpu = Tpu::new(
@ -747,6 +754,7 @@ impl Validator {
bank_notification_sender,
config.tpu_coalesce_ms,
cluster_confirmed_slot_sender,
&cost_model,
);
datapoint_info!("validator-new", ("id", id.to_string(), String));