diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index 9226150503..aeaceaceb6 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -4,7 +4,7 @@ use crossbeam_channel::unbounded; use log::*; use rand::{thread_rng, Rng}; use rayon::prelude::*; -use solana_core::banking_stage::BankingStage; +use solana_core::{banking_stage::BankingStage, cost_model::CostModel}; use solana_gossip::{cluster_info::ClusterInfo, cluster_info::Node}; use solana_ledger::{ blockstore::Blockstore, @@ -26,7 +26,7 @@ use solana_sdk::{ transaction::Transaction, }; use std::{ - sync::{atomic::Ordering, mpsc::Receiver, Arc, Mutex}, + sync::{atomic::Ordering, mpsc::Receiver, Arc, Mutex, RwLock}, thread::sleep, time::{Duration, Instant}, }; @@ -224,6 +224,7 @@ fn main() { vote_receiver, None, replay_vote_sender, + &Arc::new(RwLock::new(CostModel::default())), ); poh_recorder.lock().unwrap().set_bank(&bank); diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index c243b72014..09118d3151 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -34,7 +34,7 @@ use solana_sdk::transaction::Transaction; use std::collections::VecDeque; use std::sync::atomic::Ordering; use std::sync::mpsc::Receiver; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, RwLock}; use std::time::{Duration, Instant}; use test::Bencher; @@ -93,8 +93,8 @@ fn bench_consume_buffered(bencher: &mut Bencher) { None::>, &BankingStageStats::default(), &recorder, - &Arc::new(CostModel::default()), - &Arc::new(Mutex::new(CostTracker::new(std::u32::MAX, std::u32::MAX))), + &Arc::new(RwLock::new(CostModel::default())), + &Arc::new(Mutex::new(CostTracker::new(std::u64::MAX, std::u64::MAX))), ); }); @@ -215,8 +215,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { vote_receiver, None, s, - std::u32::MAX, - std::u32::MAX, + &Arc::new(RwLock::new(CostModel::new(std::u64::MAX, std::u64::MAX))), ); poh_recorder.lock().unwrap().set_bank(&bank); diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index ea464cf868..a0548a1d24 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -1,11 +1,7 @@ //! The `banking_stage` processes Transaction messages. It is intended to be used //! to contruct a software pipeline. The stage uses all available CPU cores and //! can do its processing in parallel with signature verification on the GPU. -use crate::{ - cost_model::{CostModel, ACCOUNT_MAX_COST, BLOCK_MAX_COST}, - cost_tracker::CostTracker, - packet_hasher::PacketHasher, -}; +use crate::{cost_model::CostModel, cost_tracker::CostTracker, packet_hasher::PacketHasher}; use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError}; use itertools::Itertools; use lru::LruCache; @@ -55,7 +51,7 @@ use std::{ net::UdpSocket, ops::DerefMut, sync::atomic::{AtomicU64, AtomicUsize, Ordering}, - sync::{Arc, Mutex}, + sync::{Arc, Mutex, RwLock}, thread::{self, Builder, JoinHandle}, time::Duration, time::Instant, @@ -226,6 +222,7 @@ impl BankingStage { verified_vote_receiver: CrossbeamReceiver>, transaction_status_sender: Option, gossip_vote_sender: ReplayVoteSender, + cost_model: &Arc>, ) -> Self { Self::new_with_cost_limit( cluster_info, @@ -234,8 +231,7 @@ impl BankingStage { verified_vote_receiver, transaction_status_sender, gossip_vote_sender, - ACCOUNT_MAX_COST, - BLOCK_MAX_COST, + cost_model, ) } @@ -246,15 +242,12 @@ impl BankingStage { verified_vote_receiver: CrossbeamReceiver>, transaction_status_sender: Option, gossip_vote_sender: ReplayVoteSender, - account_cost_limit: u32, - block_cost_limit: u32, + cost_model: &Arc>, ) -> Self { - // shared immutable 'cost_model' that calcuates transaction costs // shared mutex guarded 'cost_tracker' tracks bank's cost against configured limits. - let cost_model = Arc::new(CostModel::new(account_cost_limit, block_cost_limit)); let cost_tracker = Arc::new(Mutex::new(CostTracker::new( - cost_model.get_account_cost_limit(), - cost_model.get_block_cost_limit(), + cost_model.read().unwrap().get_account_cost_limit(), + cost_model.read().unwrap().get_block_cost_limit(), ))); Self::new_num_threads( cluster_info, @@ -277,7 +270,7 @@ impl BankingStage { num_threads: u32, transaction_status_sender: Option, gossip_vote_sender: ReplayVoteSender, - cost_model: &Arc, + cost_model: &Arc>, cost_tracker: &Arc>, ) -> Self { let batch_limit = TOTAL_BUFFERED_PACKETS / ((num_threads - 1) as usize * PACKETS_PER_BATCH); @@ -386,7 +379,7 @@ impl BankingStage { test_fn: Option, banking_stage_stats: &BankingStageStats, recorder: &TransactionRecorder, - cost_model: &Arc, + cost_model: &Arc>, cost_tracker: &Arc>, ) { let mut rebuffered_packets_len = 0; @@ -530,7 +523,7 @@ impl BankingStage { gossip_vote_sender: &ReplayVoteSender, banking_stage_stats: &BankingStageStats, recorder: &TransactionRecorder, - cost_model: &Arc, + cost_model: &Arc>, cost_tracker: &Arc>, ) -> BufferedPacketsDecision { let bank_start; @@ -647,7 +640,7 @@ impl BankingStage { transaction_status_sender: Option, gossip_vote_sender: ReplayVoteSender, duplicates: &Arc, PacketHasher)>>, - cost_model: &Arc, + cost_model: &Arc>, cost_tracker: &Arc>, ) { let recorder = poh_recorder.lock().unwrap().recorder(); @@ -1044,7 +1037,7 @@ impl BankingStage { msgs: &Packets, transaction_indexes: &[usize], secp256k1_program_enabled: bool, - cost_model: &Arc, + cost_model: &Arc>, cost_tracker: &Arc>, ) -> (Vec>, Vec, Vec) { // Making a snapshot of shared cost_tracker by clone(), drop lock immediately. @@ -1062,11 +1055,11 @@ impl BankingStage { tx.verify_precompiles().ok()?; } - // Get transaction cost via immutable cost_model; try to add cost to + // Get transaction cost via cost_model; try to add cost to // local copy of cost_tracker, if suceeded, local copy is updated // and transaction added to valid list; otherwise, transaction is // added to retry list. No locking here. - let tx_cost = cost_model.calculate_cost(&tx); + let tx_cost = cost_model.read().unwrap().calculate_cost(&tx); let result = cost_tracker.try_add(tx_cost); if result.is_err() { debug!("transaction {:?} would exceed limit: {:?}", tx, result); @@ -1139,7 +1132,7 @@ impl BankingStage { transaction_status_sender: Option, gossip_vote_sender: &ReplayVoteSender, banking_stage_stats: &BankingStageStats, - cost_model: &Arc, + cost_model: &Arc>, cost_tracker: &Arc>, ) -> (usize, usize, Vec) { let mut packet_conversion_time = Measure::start("packet_conversion"); @@ -1177,7 +1170,7 @@ impl BankingStage { // applying cost of processed transactions to shared cost_tracker transactions.iter().enumerate().for_each(|(index, tx)| { if !unprocessed_tx_indexes.iter().any(|&i| i == index) { - let tx_cost = cost_model.calculate_cost(&tx.transaction()); + let tx_cost = cost_model.read().unwrap().calculate_cost(&tx.transaction()); let mut guard = cost_tracker.lock().unwrap(); let _result = guard.try_add(tx_cost); drop(guard); @@ -1221,7 +1214,7 @@ impl BankingStage { transaction_indexes: &[usize], my_pubkey: &Pubkey, next_leader: Option, - cost_model: &Arc, + cost_model: &Arc>, cost_tracker: &Arc>, ) -> Vec { // Check if we are the next leader. If so, let's not filter the packets @@ -1293,7 +1286,7 @@ impl BankingStage { banking_stage_stats: &BankingStageStats, duplicates: &Arc, PacketHasher)>>, recorder: &TransactionRecorder, - cost_model: &Arc, + cost_model: &Arc>, cost_tracker: &Arc>, ) -> Result<(), RecvTimeoutError> { let mut recv_time = Measure::start("process_packets_recv"); @@ -1515,6 +1508,7 @@ fn next_leader_tpu_forwards( #[cfg(test)] mod tests { use super::*; + use crate::cost_model::{ACCOUNT_MAX_COST, BLOCK_MAX_COST}; use crossbeam_channel::unbounded; use itertools::Itertools; use solana_gossip::cluster_info::Node; @@ -1575,6 +1569,7 @@ mod tests { vote_receiver, None, gossip_vote_sender, + &Arc::new(RwLock::new(CostModel::default())), ); drop(verified_sender); drop(vote_sender); @@ -1620,6 +1615,7 @@ mod tests { vote_receiver, None, gossip_vote_sender, + &Arc::new(RwLock::new(CostModel::default())), ); trace!("sending bank"); drop(verified_sender); @@ -1689,6 +1685,7 @@ mod tests { vote_receiver, None, gossip_vote_sender, + &Arc::new(RwLock::new(CostModel::default())), ); // fund another account so we can send 2 good transactions in a single batch. @@ -1837,7 +1834,7 @@ mod tests { 2, None, gossip_vote_sender, - &Arc::new(CostModel::default()), + &Arc::new(RwLock::new(CostModel::default())), &Arc::new(Mutex::new(CostTracker::new( ACCOUNT_MAX_COST, BLOCK_MAX_COST, @@ -2662,7 +2659,7 @@ mod tests { None::>, &BankingStageStats::default(), &recorder, - &Arc::new(CostModel::default()), + &Arc::new(RwLock::new(CostModel::default())), &Arc::new(Mutex::new(CostTracker::new( ACCOUNT_MAX_COST, BLOCK_MAX_COST, @@ -2683,7 +2680,7 @@ mod tests { None::>, &BankingStageStats::default(), &recorder, - &Arc::new(CostModel::default()), + &Arc::new(RwLock::new(CostModel::default())), &Arc::new(Mutex::new(CostTracker::new( ACCOUNT_MAX_COST, BLOCK_MAX_COST, @@ -2753,7 +2750,7 @@ mod tests { test_fn, &BankingStageStats::default(), &recorder, - &Arc::new(CostModel::default()), + &Arc::new(RwLock::new(CostModel::default())), &Arc::new(Mutex::new(CostTracker::new( ACCOUNT_MAX_COST, BLOCK_MAX_COST, diff --git a/core/src/cost_model.rs b/core/src/cost_model.rs index 2b078fe591..ed81fc27ed 100644 --- a/core/src/cost_model.rs +++ b/core/src/cost_model.rs @@ -7,33 +7,21 @@ //! is transaction's "execution cost" //! The main function is `calculate_cost` which returns a TransactionCost struct. //! +use crate::execute_cost_table::ExecuteCostTable; use log::*; -use solana_sdk::{ - bpf_loader, bpf_loader_deprecated, bpf_loader_upgradeable, feature, incinerator, - message::Message, native_loader, pubkey::Pubkey, secp256k1_program, system_program, - transaction::Transaction, -}; +use solana_sdk::{message::Message, pubkey::Pubkey, transaction::Transaction}; use std::collections::HashMap; -// from mainnet-beta data, taking `vote program` as 1 COST_UNIT to load and execute -// amount all type programs, the costs are: -// min: 0.9 COST_UNIT -// max: 110 COST UNIT -// Median: 12 COST_UNIT -// Average: 19 COST_UNIT -const COST_UNIT: u32 = 1; -const DEFAULT_PROGRAM_COST: u32 = COST_UNIT * 100; -// re-adjust these numbers if needed -const SIGNED_WRITABLE_ACCOUNT_ACCESS_COST: u32 = COST_UNIT * 10; -const SIGNED_READONLY_ACCOUNT_ACCESS_COST: u32 = COST_UNIT * 2; -const NON_SIGNED_WRITABLE_ACCOUNT_ACCESS_COST: u32 = COST_UNIT * 5; -const NON_SIGNED_READONLY_ACCOUNT_ACCESS_COST: u32 = COST_UNIT; -// running 'ledger-tool compute-cost' over mainnet ledger, the largest block cost -// is 575_687, and the largest chain cost (eg account cost) is 559_000 -// Configuring cost model to have larger block limit and smaller account limit -// to encourage packing parallelizable transactions in block. -pub const ACCOUNT_MAX_COST: u32 = COST_UNIT * 10_000; -pub const BLOCK_MAX_COST: u32 = COST_UNIT * 10_000_000; +// Guestimated from mainnet-beta data, sigver averages 1us, read averages 7us and write avergae 25us +const SIGNED_WRITABLE_ACCOUNT_ACCESS_COST: u64 = 1 + 25; +const SIGNED_READONLY_ACCOUNT_ACCESS_COST: u64 = 1 + 7; +const NON_SIGNED_WRITABLE_ACCOUNT_ACCESS_COST: u64 = 25; +const NON_SIGNED_READONLY_ACCOUNT_ACCESS_COST: u64 = 7; + +// Sampled from mainnet-beta, the instruction execution timings stats are (in us): +// min=194, max=62164, avg=8214.49, med=2243 +pub const ACCOUNT_MAX_COST: u64 = 100_000_000; +pub const BLOCK_MAX_COST: u64 = 2_500_000_000; // cost of transaction is made of account_access_cost and instruction execution_cost // where @@ -44,52 +32,15 @@ pub const BLOCK_MAX_COST: u32 = COST_UNIT * 10_000_000; #[derive(Default, Debug)] pub struct TransactionCost { pub writable_accounts: Vec, - pub account_access_cost: u32, - pub execution_cost: u32, -} - -// instruction execution code table is initialized with default values, and -// updated with realtime information (by Replay) -#[derive(Debug)] -struct InstructionExecutionCostTable { - pub table: HashMap, -} - -macro_rules! costmetrics { - ($( $key: expr => $val: expr ),*) => {{ - let mut hashmap: HashMap< Pubkey, u32 > = HashMap::new(); - $( hashmap.insert( $key, $val); )* - hashmap - }} -} - -impl InstructionExecutionCostTable { - // build cost table with default value - pub fn new() -> Self { - Self { - table: costmetrics![ - solana_config_program::id() => COST_UNIT, - feature::id() => COST_UNIT * 2, - incinerator::id() => COST_UNIT * 2, - native_loader::id() => COST_UNIT * 2, - solana_stake_program::id() => COST_UNIT * 2, - solana_stake_program::config::id() => COST_UNIT, - solana_vote_program::id() => COST_UNIT, - secp256k1_program::id() => COST_UNIT, - system_program::id() => COST_UNIT * 8, - bpf_loader::id() => COST_UNIT * 500, - bpf_loader_deprecated::id() => COST_UNIT * 500, - bpf_loader_upgradeable::id() => COST_UNIT * 500 - ], - } - } + pub account_access_cost: u64, + pub execution_cost: u64, } #[derive(Debug)] pub struct CostModel { - account_cost_limit: u32, - block_cost_limit: u32, - instruction_execution_cost_table: InstructionExecutionCostTable, + account_cost_limit: u64, + block_cost_limit: u64, + instruction_execution_cost_table: ExecuteCostTable, } impl Default for CostModel { @@ -99,19 +50,19 @@ impl Default for CostModel { } impl CostModel { - pub fn new(chain_max: u32, block_max: u32) -> Self { + pub fn new(chain_max: u64, block_max: u64) -> Self { Self { account_cost_limit: chain_max, block_cost_limit: block_max, - instruction_execution_cost_table: InstructionExecutionCostTable::new(), + instruction_execution_cost_table: ExecuteCostTable::default(), } } - pub fn get_account_cost_limit(&self) -> u32 { + pub fn get_account_cost_limit(&self) -> u64 { self.account_cost_limit } - pub fn get_block_cost_limit(&self) -> u32 { + pub fn get_block_cost_limit(&self) -> u64 { self.block_cost_limit } @@ -140,40 +91,39 @@ impl CostModel { } // To update or insert instruction cost to table. - // When updating, uses the average of new and old values to smooth out outliers pub fn upsert_instruction_cost( &mut self, program_key: &Pubkey, - cost: &u32, - ) -> Result { - let instruction_cost = self - .instruction_execution_cost_table - .table - .entry(*program_key) - .or_insert(*cost); - *instruction_cost = (*instruction_cost + *cost) / 2; - Ok(*instruction_cost) + cost: &u64, + ) -> Result { + self.instruction_execution_cost_table + .upsert(program_key, cost); + match self.instruction_execution_cost_table.get_cost(program_key) { + Some(cost) => Ok(*cost), + None => Err("failed to upsert to ExecuteCostTable"), + } } - fn find_instruction_cost(&self, program_key: &Pubkey) -> u32 { - match self - .instruction_execution_cost_table - .table - .get(&program_key) - { + pub fn get_instruction_cost_table(&self) -> &HashMap { + self.instruction_execution_cost_table.get_cost_table() + } + + fn find_instruction_cost(&self, program_key: &Pubkey) -> u64 { + match self.instruction_execution_cost_table.get_cost(&program_key) { Some(cost) => *cost, None => { + let default_value = self.instruction_execution_cost_table.get_mode(); debug!( - "Program key {:?} does not have assigned cost, using default {}", - program_key, DEFAULT_PROGRAM_COST + "Program key {:?} does not have assigned cost, using mode {}", + program_key, default_value ); - DEFAULT_PROGRAM_COST + default_value } } } - fn find_transaction_cost(&self, transaction: &Transaction) -> u32 { - let mut cost: u32 = 0; + fn find_transaction_cost(&self, transaction: &Transaction) -> u64 { + let mut cost: u64 = 0; for instruction in &transaction.message().instructions { let program_id = @@ -194,12 +144,12 @@ impl CostModel { signed_readonly_accounts: &[Pubkey], non_signed_writable_accounts: &[Pubkey], non_signed_readonly_accounts: &[Pubkey], - ) -> u32 { + ) -> u64 { let mut cost = 0; - cost += signed_writable_accounts.len() as u32 * SIGNED_WRITABLE_ACCOUNT_ACCESS_COST; - cost += signed_readonly_accounts.len() as u32 * SIGNED_READONLY_ACCOUNT_ACCESS_COST; - cost += non_signed_writable_accounts.len() as u32 * NON_SIGNED_WRITABLE_ACCOUNT_ACCESS_COST; - cost += non_signed_readonly_accounts.len() as u32 * NON_SIGNED_READONLY_ACCOUNT_ACCESS_COST; + cost += signed_writable_accounts.len() as u64 * SIGNED_WRITABLE_ACCOUNT_ACCESS_COST; + cost += signed_readonly_accounts.len() as u64 * SIGNED_READONLY_ACCOUNT_ACCESS_COST; + cost += non_signed_writable_accounts.len() as u64 * NON_SIGNED_WRITABLE_ACCOUNT_ACCESS_COST; + cost += non_signed_readonly_accounts.len() as u64 * NON_SIGNED_READONLY_ACCOUNT_ACCESS_COST; cost } @@ -242,12 +192,13 @@ mod tests { genesis_utils::{create_genesis_config, GenesisConfigInfo}, }; use solana_sdk::{ + bpf_loader, hash::Hash, instruction::CompiledInstruction, message::Message, signature::{Keypair, Signer}, system_instruction::{self}, - system_transaction, + system_program, system_transaction, }; use std::{ str::FromStr, @@ -269,23 +220,21 @@ mod tests { #[test] fn test_cost_model_instruction_cost() { - let testee = CostModel::default(); + let mut testee = CostModel::default(); + let known_key = Pubkey::from_str("known11111111111111111111111111111111111111").unwrap(); + testee.upsert_instruction_cost(&known_key, &100).unwrap(); // find cost for known programs - assert_eq!( - COST_UNIT, - testee.find_instruction_cost( - &Pubkey::from_str("Vote111111111111111111111111111111111111111").unwrap() - ) - ); - assert_eq!( - COST_UNIT * 500, - testee.find_instruction_cost(&bpf_loader::id()) - ); + assert_eq!(100, testee.find_instruction_cost(&known_key)); + + testee + .upsert_instruction_cost(&bpf_loader::id(), &1999) + .unwrap(); + assert_eq!(1999, testee.find_instruction_cost(&bpf_loader::id())); // unknown program is assigned with default cost assert_eq!( - DEFAULT_PROGRAM_COST, + testee.instruction_execution_cost_table.get_mode(), testee.find_instruction_cost( &Pubkey::from_str("unknown111111111111111111111111111111111111").unwrap() ) @@ -305,9 +254,12 @@ mod tests { ); // expected cost for one system transfer instructions - let expected_cost = COST_UNIT * 8; + let expected_cost = 8; - let testee = CostModel::default(); + let mut testee = CostModel::default(); + testee + .upsert_instruction_cost(&system_program::id(), &expected_cost) + .unwrap(); assert_eq!( expected_cost, testee.find_transaction_cost(&simple_transaction) @@ -327,9 +279,13 @@ mod tests { debug!("many transfer transaction {:?}", tx); // expected cost for two system transfer instructions - let expected_cost = COST_UNIT * 8 * 2; + let program_cost = 8; + let expected_cost = program_cost * 2; - let testee = CostModel::default(); + let mut testee = CostModel::default(); + testee + .upsert_instruction_cost(&system_program::id(), &program_cost) + .unwrap(); assert_eq!(expected_cost, testee.find_transaction_cost(&tx)); } @@ -355,11 +311,12 @@ mod tests { ); debug!("many random transaction {:?}", tx); - // expected cost for two random/unknown program is - let expected_cost = DEFAULT_PROGRAM_COST * 2; - let testee = CostModel::default(); - assert_eq!(expected_cost, testee.find_transaction_cost(&tx)); + let result = testee.find_transaction_cost(&tx); + + // expected cost for two random/unknown program is + let expected_cost = testee.instruction_execution_cost_table.get_mode() * 2; + assert_eq!(expected_cost, result); } #[test] @@ -411,7 +368,7 @@ mod tests { let mut cost_model = CostModel::default(); // Using default cost for unknown instruction assert_eq!( - DEFAULT_PROGRAM_COST, + cost_model.instruction_execution_cost_table.get_mode(), cost_model.find_instruction_cost(&key1) ); @@ -431,9 +388,12 @@ mod tests { let expected_account_cost = SIGNED_WRITABLE_ACCOUNT_ACCESS_COST + NON_SIGNED_WRITABLE_ACCOUNT_ACCESS_COST + NON_SIGNED_READONLY_ACCOUNT_ACCESS_COST; - let expected_execution_cost = COST_UNIT * 8; + let expected_execution_cost = 8; - let cost_model = CostModel::default(); + let mut cost_model = CostModel::default(); + cost_model + .upsert_instruction_cost(&system_program::id(), &expected_execution_cost) + .unwrap(); let tx_cost = cost_model.calculate_cost(&tx); assert_eq!(expected_account_cost, tx_cost.account_access_cost); assert_eq!(expected_execution_cost, tx_cost.execution_cost); @@ -465,7 +425,6 @@ mod tests { let expected_account_cost = SIGNED_WRITABLE_ACCOUNT_ACCESS_COST + NON_SIGNED_WRITABLE_ACCOUNT_ACCESS_COST + NON_SIGNED_READONLY_ACCOUNT_ACCESS_COST; - let expected_execution_cost = COST_UNIT * 8; let cost_model = Arc::new(CostModel::default()); @@ -483,7 +442,10 @@ mod tests { let tx_cost = cost_model.calculate_cost(&simple_transaction); assert_eq!(2, tx_cost.writable_accounts.len()); assert_eq!(expected_account_cost, tx_cost.account_access_cost); - assert_eq!(expected_execution_cost, tx_cost.execution_cost); + assert_eq!( + cost_model.instruction_execution_cost_table.get_mode(), + tx_cost.execution_cost + ); }) }) .collect(); @@ -520,7 +482,6 @@ mod tests { let cost1 = 100; let cost2 = 200; // execution cost can be either 2 * Default (before write) or cost1+cost2 (after write) - let expected_execution_cost = Arc::new(vec![cost1 + cost2, DEFAULT_PROGRAM_COST * 2]); let cost_model: Arc> = Arc::new(RwLock::new(CostModel::default())); @@ -528,7 +489,6 @@ mod tests { .map(|i| { let cost_model = cost_model.clone(); let tx = tx.clone(); - let expected_execution_cost = expected_execution_cost.clone(); if i == 5 { thread::spawn(move || { @@ -541,7 +501,6 @@ mod tests { let tx_cost = cost_model.read().unwrap().calculate_cost(&tx); assert_eq!(3, tx_cost.writable_accounts.len()); assert_eq!(expected_account_cost, tx_cost.account_access_cost); - assert!(expected_execution_cost.contains(&tx_cost.execution_cost)); }) } }) diff --git a/core/src/cost_tracker.rs b/core/src/cost_tracker.rs index 5c3fb037cf..064d56f6c9 100644 --- a/core/src/cost_tracker.rs +++ b/core/src/cost_tracker.rs @@ -7,15 +7,15 @@ use std::collections::HashMap; #[derive(Debug, Clone)] pub struct CostTracker { - account_cost_limit: u32, - block_cost_limit: u32, + account_cost_limit: u64, + block_cost_limit: u64, current_bank_slot: Slot, - cost_by_writable_accounts: HashMap, - block_cost: u32, + cost_by_writable_accounts: HashMap, + block_cost: u64, } impl CostTracker { - pub fn new(chain_max: u32, package_max: u32) -> Self { + pub fn new(chain_max: u64, package_max: u64) -> Self { assert!(chain_max <= package_max); Self { account_cost_limit: chain_max, @@ -34,7 +34,7 @@ impl CostTracker { } } - pub fn try_add(&mut self, transaction_cost: TransactionCost) -> Result { + pub fn try_add(&mut self, transaction_cost: TransactionCost) -> Result { let cost = transaction_cost.account_access_cost + transaction_cost.execution_cost; self.would_fit(&transaction_cost.writable_accounts, &cost)?; @@ -42,7 +42,7 @@ impl CostTracker { Ok(self.block_cost) } - fn would_fit(&self, keys: &[Pubkey], cost: &u32) -> Result<(), &'static str> { + fn would_fit(&self, keys: &[Pubkey], cost: &u64) -> Result<(), &'static str> { // check against the total package cost if self.block_cost + cost > self.block_cost_limit { return Err("would exceed block cost limit"); @@ -70,7 +70,7 @@ impl CostTracker { Ok(()) } - fn add_transaction(&mut self, keys: &[Pubkey], cost: &u32) { + fn add_transaction(&mut self, keys: &[Pubkey], cost: &u64) { for account_key in keys.iter() { *self .cost_by_writable_accounts @@ -84,10 +84,10 @@ impl CostTracker { // CostStats can be collected by util, such as ledger_tool #[derive(Default, Debug)] pub struct CostStats { - pub total_cost: u32, + pub total_cost: u64, pub number_of_accounts: usize, pub costliest_account: Pubkey, - pub costliest_account_cost: u32, + pub costliest_account_cost: u64, } impl CostTracker { @@ -140,7 +140,7 @@ mod tests { fn build_simple_transaction( mint_keypair: &Keypair, start_hash: &Hash, - ) -> (Transaction, Vec, u32) { + ) -> (Transaction, Vec, u64) { let keypair = Keypair::new(); let simple_transaction = system_transaction::transfer(&mint_keypair, &keypair.pubkey(), 2, *start_hash); diff --git a/core/src/execute_cost_table.rs b/core/src/execute_cost_table.rs new file mode 100644 index 0000000000..5ef0ede4cf --- /dev/null +++ b/core/src/execute_cost_table.rs @@ -0,0 +1,277 @@ +/// ExecuteCostTable is aggregated by Cost Model, it keeps each program's +/// average cost in its HashMap, with fixed capacity to avoid from growing +/// unchecked. +/// When its capacity limit is reached, it prunes old and less-used programs +/// to make room for new ones. +use log::*; +use solana_sdk::pubkey::Pubkey; +use std::{collections::HashMap, time::SystemTime}; + +// prune is rather expensive op, free up bulk space in each operation +// would be more efficient. PRUNE_RATIO defines the after prune table +// size will be original_size * PRUNE_RATIO. +const PRUNE_RATIO: f64 = 0.75; +// with 50_000 TPS as norm, weights occurrences '100' per microsec +const OCCURRENCES_WEIGHT: i64 = 100; + +const DEFAULT_CAPACITY: usize = 1024; + +#[derive(Debug)] +pub struct ExecuteCostTable { + capacity: usize, + table: HashMap, + occurrences: HashMap, +} + +impl Default for ExecuteCostTable { + fn default() -> Self { + ExecuteCostTable::new(DEFAULT_CAPACITY) + } +} + +impl ExecuteCostTable { + pub fn new(cap: usize) -> Self { + Self { + capacity: cap, + table: HashMap::new(), + occurrences: HashMap::new(), + } + } + + pub fn get_cost_table(&self) -> &HashMap { + &self.table + } + + pub fn get_count(&self) -> usize { + self.table.len() + } + + // instead of assigning unknown program with a configured/hard-coded cost + // use average or mode function to make a educated guess. + pub fn get_average(&self) -> u64 { + if self.table.is_empty() { + 0 + } else { + self.table.iter().map(|(_, value)| value).sum::() / self.get_count() as u64 + } + } + + pub fn get_mode(&self) -> u64 { + if self.occurrences.is_empty() { + 0 + } else { + let key = self + .occurrences + .iter() + .max_by_key(|&(_, count)| count) + .map(|(key, _)| key) + .expect("cannot find mode from cost table"); + + *self.table.get(&key).unwrap() + } + } + + // returns None if program doesn't exist in table. In this case, + // client is advised to call `get_average()` or `get_mode()` to + // assign a 'default' value for new program. + pub fn get_cost(&self, key: &Pubkey) -> Option<&u64> { + self.table.get(&key) + } + + pub fn upsert(&mut self, key: &Pubkey, value: &u64) { + let need_to_add = self.table.get(&key).is_none(); + let current_size = self.get_count(); + if current_size == self.capacity && need_to_add { + self.prune_to(&((current_size as f64 * PRUNE_RATIO) as usize)); + } + + let program_cost = self.table.entry(*key).or_insert(*value); + *program_cost = (*program_cost + *value) / 2; + + let (count, timestamp) = self + .occurrences + .entry(*key) + .or_insert((0, SystemTime::now())); + *count += 1; + *timestamp = SystemTime::now(); + } + + // prune the old programs so the table contains `new_size` of records, + // where `old` is defined as weighted age, which is negatively correlated + // with program's age and + // positively correlated with how frequently the program + // is executed (eg. occurrence), + fn prune_to(&mut self, new_size: &usize) { + debug!( + "prune cost table, current size {}, new size {}", + self.get_count(), + new_size + ); + + if *new_size == self.get_count() { + return; + } + + if *new_size == 0 { + self.table.clear(); + self.occurrences.clear(); + return; + } + + let now = SystemTime::now(); + let mut sorted_by_weighted_age: Vec<_> = self + .occurrences + .iter() + .map(|(key, (count, timestamp))| { + let age = now.duration_since(*timestamp).unwrap().as_micros(); + let weighted_age = *count as i64 * OCCURRENCES_WEIGHT + -(age as i64); + (weighted_age, *key) + }) + .collect(); + sorted_by_weighted_age.sort_by(|x, y| x.0.partial_cmp(&y.0).unwrap()); + + for i in sorted_by_weighted_age.iter() { + self.table.remove(&i.1); + self.occurrences.remove(&i.1); + if *new_size == self.get_count() { + break; + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_execute_cost_table_prune_simple_table() { + solana_logger::setup(); + let capacity: usize = 3; + let mut testee = ExecuteCostTable::new(capacity); + + let key1 = Pubkey::new_unique(); + let key2 = Pubkey::new_unique(); + let key3 = Pubkey::new_unique(); + + testee.upsert(&key1, &1); + testee.upsert(&key2, &2); + testee.upsert(&key3, &3); + + testee.prune_to(&(capacity - 1)); + + // the oldest, key1, should be pruned + assert!(testee.get_cost(&key1).is_none()); + assert!(testee.get_cost(&key2).is_some()); + assert!(testee.get_cost(&key2).is_some()); + } + + #[test] + fn test_execute_cost_table_prune_weighted_table() { + solana_logger::setup(); + let capacity: usize = 3; + let mut testee = ExecuteCostTable::new(capacity); + + let key1 = Pubkey::new_unique(); + let key2 = Pubkey::new_unique(); + let key3 = Pubkey::new_unique(); + + testee.upsert(&key1, &1); + testee.upsert(&key1, &1); + testee.upsert(&key2, &2); + testee.upsert(&key3, &3); + + testee.prune_to(&(capacity - 1)); + + // the oldest, key1, has 2 counts; 2nd oldest Key2 has 1 count; + // expect key2 to be pruned. + assert!(testee.get_cost(&key1).is_some()); + assert!(testee.get_cost(&key2).is_none()); + assert!(testee.get_cost(&key3).is_some()); + } + + #[test] + fn test_execute_cost_table_upsert_within_capacity() { + solana_logger::setup(); + let mut testee = ExecuteCostTable::default(); + + let key1 = Pubkey::new_unique(); + let key2 = Pubkey::new_unique(); + let cost1: u64 = 100; + let cost2: u64 = 110; + + // query empty table + assert!(testee.get_cost(&key1).is_none()); + + // insert one record + testee.upsert(&key1, &cost1); + assert_eq!(1, testee.get_count()); + assert_eq!(cost1, testee.get_average()); + assert_eq!(cost1, testee.get_mode()); + assert_eq!(&cost1, testee.get_cost(&key1).unwrap()); + + // insert 2nd record + testee.upsert(&key2, &cost2); + assert_eq!(2, testee.get_count()); + assert_eq!((cost1 + cost2) / 2_u64, testee.get_average()); + assert_eq!(cost2, testee.get_mode()); + assert_eq!(&cost1, testee.get_cost(&key1).unwrap()); + assert_eq!(&cost2, testee.get_cost(&key2).unwrap()); + + // update 1st record + testee.upsert(&key1, &cost2); + assert_eq!(2, testee.get_count()); + assert_eq!(((cost1 + cost2) / 2 + cost2) / 2, testee.get_average()); + assert_eq!((cost1 + cost2) / 2, testee.get_mode()); + assert_eq!(&((cost1 + cost2) / 2), testee.get_cost(&key1).unwrap()); + assert_eq!(&cost2, testee.get_cost(&key2).unwrap()); + } + + #[test] + fn test_execute_cost_table_upsert_exceeds_capacity() { + solana_logger::setup(); + let capacity: usize = 2; + let mut testee = ExecuteCostTable::new(capacity); + + let key1 = Pubkey::new_unique(); + let key2 = Pubkey::new_unique(); + let key3 = Pubkey::new_unique(); + let key4 = Pubkey::new_unique(); + let cost1: u64 = 100; + let cost2: u64 = 110; + let cost3: u64 = 120; + let cost4: u64 = 130; + + // insert one record + testee.upsert(&key1, &cost1); + assert_eq!(1, testee.get_count()); + assert_eq!(&cost1, testee.get_cost(&key1).unwrap()); + + // insert 2nd record + testee.upsert(&key2, &cost2); + assert_eq!(2, testee.get_count()); + assert_eq!(&cost1, testee.get_cost(&key1).unwrap()); + assert_eq!(&cost2, testee.get_cost(&key2).unwrap()); + + // insert 3rd record, pushes out the oldest (eg 1st) record + testee.upsert(&key3, &cost3); + assert_eq!(2, testee.get_count()); + assert_eq!((cost2 + cost3) / 2_u64, testee.get_average()); + assert_eq!(cost3, testee.get_mode()); + assert!(testee.get_cost(&key1).is_none()); + assert_eq!(&cost2, testee.get_cost(&key2).unwrap()); + assert_eq!(&cost3, testee.get_cost(&key3).unwrap()); + + // update 2nd record, so the 3rd becomes the oldest + // add 4th record, pushes out 3rd key + testee.upsert(&key2, &cost1); + testee.upsert(&key4, &cost4); + assert_eq!(((cost1 + cost2) / 2 + cost4) / 2_u64, testee.get_average()); + assert_eq!((cost1 + cost2) / 2, testee.get_mode()); + assert_eq!(2, testee.get_count()); + assert!(testee.get_cost(&key1).is_none()); + assert_eq!(&((cost1 + cost2) / 2), testee.get_cost(&key2).unwrap()); + assert!(testee.get_cost(&key3).is_none()); + assert_eq!(&cost4, testee.get_cost(&key4).unwrap()); + } +} diff --git a/core/src/lib.rs b/core/src/lib.rs index e2e4103f2a..d56ccb671f 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -20,6 +20,7 @@ pub mod completed_data_sets_service; pub mod consensus; pub mod cost_model; pub mod cost_tracker; +pub mod execute_cost_table; pub mod fetch_stage; pub mod fork_choice; pub mod gen_keys; diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 5709b73e25..136db5dfc1 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -13,6 +13,7 @@ use crate::{ consensus::{ ComputedBankState, Stake, SwitchForkDecision, Tower, VotedStakes, SWITCH_FORK_THRESHOLD, }, + cost_model::CostModel, fork_choice::{ForkChoice, SelectVoteAndResetForkResult}, heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice, latest_validator_votes_for_frozen_banks::LatestValidatorVotesForFrozenBanks, @@ -40,8 +41,8 @@ use solana_rpc::{ rpc_subscriptions::RpcSubscriptions, }; use solana_runtime::{ - accounts_background_service::AbsRequestSender, bank::Bank, bank_forks::BankForks, - commitment::BlockCommitmentCache, vote_sender_types::ReplayVoteSender, + accounts_background_service::AbsRequestSender, bank::Bank, bank::ExecuteTimings, + bank_forks::BankForks, commitment::BlockCommitmentCache, vote_sender_types::ReplayVoteSender, }; use solana_sdk::{ clock::{Slot, MAX_PROCESSING_AGE, NUM_CONSECUTIVE_LEADER_SLOTS}, @@ -294,6 +295,7 @@ impl ReplayStage { gossip_duplicate_confirmed_slots_receiver: GossipDuplicateConfirmedSlotsReceiver, gossip_verified_vote_hash_receiver: GossipVerifiedVoteHashReceiver, cluster_slots_update_sender: ClusterSlotsUpdateSender, + cost_model: Arc>, ) -> Self { let ReplayStageConfig { my_pubkey, @@ -390,6 +392,7 @@ impl ReplayStage { &mut unfrozen_gossip_verified_vote_hashes, &mut latest_validator_votes_for_frozen_banks, &cluster_slots_update_sender, + &cost_model, ); replay_active_banks_time.stop(); @@ -1663,6 +1666,7 @@ impl ReplayStage { unfrozen_gossip_verified_vote_hashes: &mut UnfrozenGossipVerifiedVoteHashes, latest_validator_votes_for_frozen_banks: &mut LatestValidatorVotesForFrozenBanks, cluster_slots_update_sender: &ClusterSlotsUpdateSender, + cost_model: &RwLock, ) -> bool { let mut did_complete_bank = false; let mut tx_count = 0; @@ -1719,6 +1723,11 @@ impl ReplayStage { replay_vote_sender, verify_recyclers, ); + Self::update_cost_model(&cost_model, &bank_progress.replay_stats.execute_timings); + debug!( + "after replayed into bank, updated cost model instruction cost table, current values: {:?}", + cost_model.read().unwrap().get_instruction_cost_table() + ); match replay_result { Ok(replay_tx_count) => tx_count += replay_tx_count, Err(err) => { @@ -1908,6 +1917,32 @@ impl ReplayStage { new_stats } + fn update_cost_model(cost_model: &RwLock, execute_timings: &ExecuteTimings) { + let mut cost_model_mutable = cost_model.write().unwrap(); + for (program_id, stats) in &execute_timings.details.per_program_timings { + let cost = stats.0 / stats.1 as u64; + match cost_model_mutable.upsert_instruction_cost(&program_id, &cost) { + Ok(c) => { + debug!( + "after replayed into bank, instruction {:?} has averaged cost {}", + program_id, c + ); + } + Err(err) => { + debug!( + "after replayed into bank, instruction {:?} failed to update cost, err: {}", + program_id, err + ); + } + } + } + drop(cost_model_mutable); + debug!( + "after replayed into bank, updated cost model instruction cost table, current values: {:?}", + cost_model.read().unwrap().get_instruction_cost_table() + ); + } + fn update_propagation_status( progress: &mut ProgressMap, slot: Slot, @@ -4910,6 +4945,90 @@ mod tests { assert_eq!(tower.last_voted_slot().unwrap(), 1); } + #[test] + fn test_update_cost_model_with_empty_execute_timings() { + let cost_model = Arc::new(RwLock::new(CostModel::default())); + let empty_execute_timings = ExecuteTimings::default(); + ReplayStage::update_cost_model(&cost_model, &empty_execute_timings); + + assert_eq!( + 0, + cost_model + .read() + .unwrap() + .get_instruction_cost_table() + .len() + ); + } + + #[test] + fn test_update_cost_model_with_execute_timings() { + let cost_model = Arc::new(RwLock::new(CostModel::default())); + let mut execute_timings = ExecuteTimings::default(); + + let program_key_1 = Pubkey::new_unique(); + let mut expected_cost: u64; + + // add new program + { + let accumulated_us: u64 = 1000; + let count: u32 = 10; + expected_cost = accumulated_us / count as u64; + + execute_timings + .details + .per_program_timings + .insert(program_key_1, (accumulated_us, count)); + ReplayStage::update_cost_model(&cost_model, &execute_timings); + assert_eq!( + 1, + cost_model + .read() + .unwrap() + .get_instruction_cost_table() + .len() + ); + assert_eq!( + Some(&expected_cost), + cost_model + .read() + .unwrap() + .get_instruction_cost_table() + .get(&program_key_1) + ); + } + + // update program + { + let accumulated_us: u64 = 2000; + let count: u32 = 10; + // to expect new cost is Average(new_value, existing_value) + expected_cost = ((accumulated_us / count as u64) + expected_cost) / 2; + + execute_timings + .details + .per_program_timings + .insert(program_key_1, (accumulated_us, count)); + ReplayStage::update_cost_model(&cost_model, &execute_timings); + assert_eq!( + 1, + cost_model + .read() + .unwrap() + .get_instruction_cost_table() + .len() + ); + assert_eq!( + Some(&expected_cost), + cost_model + .read() + .unwrap() + .get_instruction_cost_table() + .get(&program_key_1) + ); + } + } + fn run_compute_and_select_forks( bank_forks: &RwLock, progress: &mut ProgressMap, diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 4d79cd8f7d..c4f1bd5fd0 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -8,6 +8,7 @@ use crate::{ ClusterInfoVoteListener, GossipDuplicateConfirmedSlotsSender, GossipVerifiedVoteHashSender, VerifiedVoteSender, VoteTracker, }, + cost_model::CostModel, fetch_stage::FetchStage, sigverify::TransactionSigVerifier, sigverify_stage::SigVerifyStage, @@ -69,6 +70,7 @@ impl Tpu { bank_notification_sender: Option, tpu_coalesce_ms: u64, cluster_confirmed_slot_sender: GossipDuplicateConfirmedSlotsSender, + cost_model: &Arc>, ) -> Self { let (packet_sender, packet_receiver) = channel(); let fetch_stage = FetchStage::new_with_sender( @@ -110,6 +112,7 @@ impl Tpu { verified_vote_packets_receiver, transaction_status_sender, replay_vote_sender, + cost_model, ); let broadcast_stage = broadcast_type.new_broadcast_stage( diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 1ec03d843b..f4fe600c1e 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -12,6 +12,7 @@ use crate::{ cluster_slots::ClusterSlots, completed_data_sets_service::CompletedDataSetsSender, consensus::Tower, + cost_model::CostModel, ledger_cleanup_service::LedgerCleanupService, replay_stage::{ReplayStage, ReplayStageConfig}, retransmit_stage::RetransmitStage, @@ -128,6 +129,7 @@ impl Tvu { gossip_confirmed_slots_receiver: GossipDuplicateConfirmedSlotsReceiver, tvu_config: TvuConfig, max_slots: &Arc, + cost_model: &Arc>, ) -> Self { let keypair: Arc = cluster_info.keypair.clone(); @@ -291,6 +293,7 @@ impl Tvu { gossip_confirmed_slots_receiver, gossip_verified_vote_hash_receiver, cluster_slots_update_sender, + cost_model.clone(), ); let ledger_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| { @@ -437,6 +440,7 @@ pub mod tests { gossip_confirmed_slots_receiver, TvuConfig::default(), &Arc::new(MaxSlots::default()), + &Arc::new(RwLock::new(CostModel::default())), ); exit.store(true, Ordering::Relaxed); tvu.join().unwrap(); diff --git a/core/src/validator.rs b/core/src/validator.rs index eee6bcd76a..fda0f0108f 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -6,6 +6,7 @@ use crate::{ cluster_info_vote_listener::VoteTracker, completed_data_sets_service::CompletedDataSetsService, consensus::{reconcile_blockstore_roots_with_tower, Tower}, + cost_model::{CostModel, ACCOUNT_MAX_COST, BLOCK_MAX_COST}, rewards_recorder_service::{RewardsRecorderSender, RewardsRecorderService}, sample_performance_service::SamplePerformanceService, serve_repair::ServeRepair, @@ -650,6 +651,11 @@ impl Validator { bank_forks.read().unwrap().root_bank().deref(), )); + let cost_model = Arc::new(RwLock::new(CostModel::new( + ACCOUNT_MAX_COST, + BLOCK_MAX_COST, + ))); + let (retransmit_slots_sender, retransmit_slots_receiver) = unbounded(); let (verified_vote_sender, verified_vote_receiver) = unbounded(); let (gossip_verified_vote_hash_sender, gossip_verified_vote_hash_receiver) = unbounded(); @@ -722,6 +728,7 @@ impl Validator { wait_for_vote_to_start_leader, }, &max_slots, + &cost_model, ); let tpu = Tpu::new( @@ -747,6 +754,7 @@ impl Validator { bank_notification_sender, config.tpu_coalesce_ms, cluster_confirmed_slot_sender, + &cost_model, ); datapoint_info!("validator-new", ("id", id.to_string(), String));