diff --git a/runtime/src/accounts.rs b/runtime/src/accounts.rs index c77f7b31b3..b471399966 100644 --- a/runtime/src/accounts.rs +++ b/runtime/src/accounts.rs @@ -112,18 +112,21 @@ impl AccountsDB { txs: &[Transaction], res: &[Result<()>], loaded: &[Result<(InstructionAccounts, InstructionLoaders)>], - ) { + ) -> u64 { + let mut fee = 0; for (i, raccs) in loaded.iter().enumerate() { if res[i].is_err() || raccs.is_err() { continue; } let tx = &txs[i]; + fee += tx.fee; let acc = raccs.as_ref().unwrap(); for (key, account) in tx.account_keys.iter().zip(acc.0.iter()) { self.store(purge, key, account); } } + fee } fn load_tx_accounts( checkpoints: &[U], @@ -372,7 +375,7 @@ impl Accounts { txs: &[Transaction], res: &[Result<()>], loaded: &[Result<(InstructionAccounts, InstructionLoaders)>], - ) { + ) -> u64 { self.accounts_db .write() .unwrap() diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index e6d699d53a..d5c46726e6 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -446,9 +446,10 @@ impl Bank { txs: &[Transaction], loaded_accounts: &[Result<(InstructionAccounts, InstructionLoaders)>], executed: &[Result<()>], - ) { + ) -> u64 { let now = Instant::now(); - self.accounts + let fee = self + .accounts .store_accounts(true, txs, executed, loaded_accounts); // once committed there is no way to unroll @@ -459,6 +460,7 @@ impl Bank { txs.len(), ); self.update_transaction_statuses(txs, &executed); + fee } /// Process a batch of transactions. @@ -468,18 +470,19 @@ impl Bank { txs: &[Transaction], lock_results: Vec>, max_age: usize, - ) -> Vec> { + ) -> (Vec>, u64) { let (loaded_accounts, executed) = self.load_and_execute_transactions(txs, lock_results, max_age); - self.commit_transactions(txs, &loaded_accounts, &executed); - executed + let fee = self.commit_transactions(txs, &loaded_accounts, &executed); + (executed, fee) } #[must_use] pub fn process_transactions(&self, txs: &[Transaction]) -> Vec> { let lock_results = self.lock_accounts(txs); - let results = self.load_execute_and_commit_transactions(txs, lock_results, MAX_ENTRY_IDS); + let (results, _fee) = + self.load_execute_and_commit_transactions(txs, lock_results, MAX_ENTRY_IDS); self.unlock_accounts(txs, &results); results } @@ -526,6 +529,13 @@ impl Bank { parents } + pub fn collect_tx_fee(&self, pubkey: &Pubkey, fee: u64) { + if let Some(mut account) = self.get_account(pubkey) { + account.tokens += fee; + self.accounts.store_slow(false, pubkey, &account); + } + } + pub fn get_account(&self, pubkey: &Pubkey) -> Option { let parents = self.parents(); let mut accounts = vec![&self.accounts]; @@ -888,7 +898,7 @@ mod tests { let pay_alice = vec![tx1]; let lock_result = bank.lock_accounts(&pay_alice); - let results_alice = + let (results_alice, fee) = bank.load_execute_and_commit_transactions(&pay_alice, lock_result, MAX_ENTRY_IDS); assert_eq!(results_alice[0], Ok(())); diff --git a/src/blocktree_processor.rs b/src/blocktree_processor.rs index 22ed666374..e022b5d21d 100644 --- a/src/blocktree_processor.rs +++ b/src/blocktree_processor.rs @@ -56,22 +56,27 @@ fn ignore_program_errors(results: Vec>) -> Vec> { .collect() } -fn par_execute_entries(bank: &Bank, entries: &[(&Entry, Vec>)]) -> Result<()> { +fn par_execute_entries(bank: &Bank, entries: &[(&Entry, Vec>)]) -> (Result<()>, u64) { inc_new_counter_info!("bank-par_execute_entries-count", entries.len()); - let results: Vec> = entries + let results_fees: Vec<(Result<()>, u64)> = entries .into_par_iter() .map(|(e, lock_results)| { - let old_results = bank.load_execute_and_commit_transactions( + let (old_results, fee) = bank.load_execute_and_commit_transactions( &e.transactions, lock_results.to_vec(), MAX_ENTRY_IDS, ); let results = ignore_program_errors(old_results); bank.unlock_accounts(&e.transactions, &results); - first_err(&results) + (first_err(&results), fee) }) .collect(); - first_err(&results) + + let fee = results_fees.iter().map(|(_, fee)| fee).sum(); + let mut results = Vec::new(); + results.reserve_exact(results_fees.len()); + results = results_fees.into_iter().map(|(res, _)| res).collect(); + (first_err(&results[..]), fee) } /// process entries in parallel @@ -83,13 +88,18 @@ fn par_process_entries_with_scheduler( bank: &Bank, entries: &[Entry], leader_scheduler: &Arc>, -) -> Result<()> { +) -> (Result<()>, u64) { // accumulator for entries that can be processed in parallel let mut mt_group = vec![]; + let mut fees = 0; for entry in entries { if entry.is_tick() { // if its a tick, execute the group and register the tick - par_execute_entries(bank, &mt_group)?; + let (res, fee) = par_execute_entries(bank, &mt_group); + fees += fee; + if res.is_err() { + return (res, fees); + } bank.register_tick(&entry.id); leader_scheduler .write() @@ -103,7 +113,11 @@ fn par_process_entries_with_scheduler( // if any of the locks error out // execute the current group if first_err(&lock_results).is_err() { - par_execute_entries(bank, &mt_group)?; + let (res, fee) = par_execute_entries(bank, &mt_group); + fees += fee; + if res.is_err() { + return (res, fees); + } mt_group = vec![]; //reset the lock and push the entry bank.unlock_accounts(&entry.transactions, &lock_results); @@ -114,8 +128,9 @@ fn par_process_entries_with_scheduler( mt_group.push((entry, lock_results)); } } - par_execute_entries(bank, &mt_group)?; - Ok(()) + let (res, fee) = par_execute_entries(bank, &mt_group); + fees += fee; + (res, fees) } /// Process an ordered list of entries. @@ -123,7 +138,7 @@ pub fn process_entries( bank: &Bank, entries: &[Entry], leader_scheduler: &Arc>, -) -> Result<()> { +) -> (Result<()>, u64) { par_process_entries_with_scheduler(bank, entries, leader_scheduler) } @@ -389,7 +404,7 @@ mod tests { fn par_process_entries(bank: &Bank, entries: &[Entry]) -> Result<()> { let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::default())); - par_process_entries_with_scheduler(bank, entries, &leader_scheduler) + par_process_entries_with_scheduler(bank, entries, &leader_scheduler).0 } fn create_sample_block_with_next_entries_using_keypairs( diff --git a/src/replay_stage.rs b/src/replay_stage.rs index c077e65f95..2d2ce82866 100644 --- a/src/replay_stage.rs +++ b/src/replay_stage.rs @@ -65,6 +65,7 @@ impl ReplayStage { last_entry_id: &Arc>, leader_scheduler: &Arc>, subscriptions: &Arc, + fees: &mut u64, ) -> Result<()> { // Coalesce all the available entries into a single vote submit( @@ -108,7 +109,10 @@ impl ReplayStage { // If we don't process the entry now, the for loop will exit and the entry // will be dropped. if 0 == num_ticks_to_next_vote || (i + 1) == entries.len() { - res = blocktree_processor::process_entries(bank, &entries[0..=i], leader_scheduler); + let (res_int, fee) = + blocktree_processor::process_entries(bank, &entries[0..=i], leader_scheduler); + res = res_int; + *fees += fee; if res.is_err() { // TODO: This will return early from the first entry that has an erroneous @@ -123,6 +127,19 @@ impl ReplayStage { if 0 == num_ticks_to_next_vote { subscriptions.notify_subscribers(&bank); + let slot_num = leader_scheduler + .read() + .unwrap() + .tick_height_to_slot(num_ticks); + if let Some(leader) = leader_scheduler + .read() + .unwrap() + .get_leader_for_slot(slot_num) + { + // Credit the accumulated fees to the current leader and reset the fee to 0 + bank.collect_tx_fee(&leader, *fees); + *fees = 0; + } if let Some(voting_keypair) = voting_keypair { let keypair = voting_keypair.as_ref(); let vote = VoteTransaction::new_vote( @@ -207,6 +224,8 @@ impl ReplayStage { ) }; + let mut fees = 0; + // Loop through blocktree MAX_ENTRY_RECV_PER_ITER entries at a time for each // relevant slot to see if there are any available updates loop { @@ -268,6 +287,7 @@ impl ReplayStage { &last_entry_id, &leader_scheduler_, &subscriptions_, + &mut fees, ) { error!("process_entries failed: {:?}", e); } @@ -756,6 +776,7 @@ mod test { let leader_scheduler_config = LeaderSchedulerConfig::default(); let leader_scheduler = LeaderScheduler::new_with_bank(&leader_scheduler_config, &bank); let leader_scheduler = Arc::new(RwLock::new(leader_scheduler)); + let mut fee = 0; let res = ReplayStage::process_entries( entries.clone(), &bank, @@ -766,6 +787,7 @@ mod test { &Arc::new(RwLock::new(last_entry_id)), &leader_scheduler, &Arc::new(RpcSubscriptions::default()), + &mut fee, ); match res { @@ -792,6 +814,7 @@ mod test { &Arc::new(RwLock::new(last_entry_id)), &leader_scheduler, &Arc::new(RpcSubscriptions::default()), + &mut fee, ); match res {