Credit transaction fees to the slot leader

This commit is contained in:
Pankaj Garg 2019-02-19 15:41:40 -08:00 committed by Greg Fitzgerald
parent 3d00992c95
commit a27cdf55e7
4 changed files with 73 additions and 22 deletions

View File

@ -112,18 +112,21 @@ impl AccountsDB {
txs: &[Transaction], txs: &[Transaction],
res: &[Result<()>], res: &[Result<()>],
loaded: &[Result<(InstructionAccounts, InstructionLoaders)>], loaded: &[Result<(InstructionAccounts, InstructionLoaders)>],
) { ) -> u64 {
let mut fee = 0;
for (i, raccs) in loaded.iter().enumerate() { for (i, raccs) in loaded.iter().enumerate() {
if res[i].is_err() || raccs.is_err() { if res[i].is_err() || raccs.is_err() {
continue; continue;
} }
let tx = &txs[i]; let tx = &txs[i];
fee += tx.fee;
let acc = raccs.as_ref().unwrap(); let acc = raccs.as_ref().unwrap();
for (key, account) in tx.account_keys.iter().zip(acc.0.iter()) { for (key, account) in tx.account_keys.iter().zip(acc.0.iter()) {
self.store(purge, key, account); self.store(purge, key, account);
} }
} }
fee
} }
fn load_tx_accounts<U>( fn load_tx_accounts<U>(
checkpoints: &[U], checkpoints: &[U],
@ -372,7 +375,7 @@ impl Accounts {
txs: &[Transaction], txs: &[Transaction],
res: &[Result<()>], res: &[Result<()>],
loaded: &[Result<(InstructionAccounts, InstructionLoaders)>], loaded: &[Result<(InstructionAccounts, InstructionLoaders)>],
) { ) -> u64 {
self.accounts_db self.accounts_db
.write() .write()
.unwrap() .unwrap()

View File

@ -446,9 +446,10 @@ impl Bank {
txs: &[Transaction], txs: &[Transaction],
loaded_accounts: &[Result<(InstructionAccounts, InstructionLoaders)>], loaded_accounts: &[Result<(InstructionAccounts, InstructionLoaders)>],
executed: &[Result<()>], executed: &[Result<()>],
) { ) -> u64 {
let now = Instant::now(); let now = Instant::now();
self.accounts let fee = self
.accounts
.store_accounts(true, txs, executed, loaded_accounts); .store_accounts(true, txs, executed, loaded_accounts);
// once committed there is no way to unroll // once committed there is no way to unroll
@ -459,6 +460,7 @@ impl Bank {
txs.len(), txs.len(),
); );
self.update_transaction_statuses(txs, &executed); self.update_transaction_statuses(txs, &executed);
fee
} }
/// Process a batch of transactions. /// Process a batch of transactions.
@ -468,18 +470,19 @@ impl Bank {
txs: &[Transaction], txs: &[Transaction],
lock_results: Vec<Result<()>>, lock_results: Vec<Result<()>>,
max_age: usize, max_age: usize,
) -> Vec<Result<()>> { ) -> (Vec<Result<()>>, u64) {
let (loaded_accounts, executed) = let (loaded_accounts, executed) =
self.load_and_execute_transactions(txs, lock_results, max_age); self.load_and_execute_transactions(txs, lock_results, max_age);
self.commit_transactions(txs, &loaded_accounts, &executed); let fee = self.commit_transactions(txs, &loaded_accounts, &executed);
executed (executed, fee)
} }
#[must_use] #[must_use]
pub fn process_transactions(&self, txs: &[Transaction]) -> Vec<Result<()>> { pub fn process_transactions(&self, txs: &[Transaction]) -> Vec<Result<()>> {
let lock_results = self.lock_accounts(txs); let lock_results = self.lock_accounts(txs);
let results = self.load_execute_and_commit_transactions(txs, lock_results, MAX_ENTRY_IDS); let (results, _fee) =
self.load_execute_and_commit_transactions(txs, lock_results, MAX_ENTRY_IDS);
self.unlock_accounts(txs, &results); self.unlock_accounts(txs, &results);
results results
} }
@ -526,6 +529,13 @@ impl Bank {
parents parents
} }
pub fn collect_tx_fee(&self, pubkey: &Pubkey, fee: u64) {
if let Some(mut account) = self.get_account(pubkey) {
account.tokens += fee;
self.accounts.store_slow(false, pubkey, &account);
}
}
pub fn get_account(&self, pubkey: &Pubkey) -> Option<Account> { pub fn get_account(&self, pubkey: &Pubkey) -> Option<Account> {
let parents = self.parents(); let parents = self.parents();
let mut accounts = vec![&self.accounts]; let mut accounts = vec![&self.accounts];
@ -888,7 +898,7 @@ mod tests {
let pay_alice = vec![tx1]; let pay_alice = vec![tx1];
let lock_result = bank.lock_accounts(&pay_alice); let lock_result = bank.lock_accounts(&pay_alice);
let results_alice = let (results_alice, fee) =
bank.load_execute_and_commit_transactions(&pay_alice, lock_result, MAX_ENTRY_IDS); bank.load_execute_and_commit_transactions(&pay_alice, lock_result, MAX_ENTRY_IDS);
assert_eq!(results_alice[0], Ok(())); assert_eq!(results_alice[0], Ok(()));

View File

@ -56,22 +56,27 @@ fn ignore_program_errors(results: Vec<Result<()>>) -> Vec<Result<()>> {
.collect() .collect()
} }
fn par_execute_entries(bank: &Bank, entries: &[(&Entry, Vec<Result<()>>)]) -> Result<()> { fn par_execute_entries(bank: &Bank, entries: &[(&Entry, Vec<Result<()>>)]) -> (Result<()>, u64) {
inc_new_counter_info!("bank-par_execute_entries-count", entries.len()); inc_new_counter_info!("bank-par_execute_entries-count", entries.len());
let results: Vec<Result<()>> = entries let results_fees: Vec<(Result<()>, u64)> = entries
.into_par_iter() .into_par_iter()
.map(|(e, lock_results)| { .map(|(e, lock_results)| {
let old_results = bank.load_execute_and_commit_transactions( let (old_results, fee) = bank.load_execute_and_commit_transactions(
&e.transactions, &e.transactions,
lock_results.to_vec(), lock_results.to_vec(),
MAX_ENTRY_IDS, MAX_ENTRY_IDS,
); );
let results = ignore_program_errors(old_results); let results = ignore_program_errors(old_results);
bank.unlock_accounts(&e.transactions, &results); bank.unlock_accounts(&e.transactions, &results);
first_err(&results) (first_err(&results), fee)
}) })
.collect(); .collect();
first_err(&results)
let fee = results_fees.iter().map(|(_, fee)| fee).sum();
let mut results = Vec::new();
results.reserve_exact(results_fees.len());
results = results_fees.into_iter().map(|(res, _)| res).collect();
(first_err(&results[..]), fee)
} }
/// process entries in parallel /// process entries in parallel
@ -83,13 +88,18 @@ fn par_process_entries_with_scheduler(
bank: &Bank, bank: &Bank,
entries: &[Entry], entries: &[Entry],
leader_scheduler: &Arc<RwLock<LeaderScheduler>>, leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
) -> Result<()> { ) -> (Result<()>, u64) {
// accumulator for entries that can be processed in parallel // accumulator for entries that can be processed in parallel
let mut mt_group = vec![]; let mut mt_group = vec![];
let mut fees = 0;
for entry in entries { for entry in entries {
if entry.is_tick() { if entry.is_tick() {
// if its a tick, execute the group and register the tick // if its a tick, execute the group and register the tick
par_execute_entries(bank, &mt_group)?; let (res, fee) = par_execute_entries(bank, &mt_group);
fees += fee;
if res.is_err() {
return (res, fees);
}
bank.register_tick(&entry.id); bank.register_tick(&entry.id);
leader_scheduler leader_scheduler
.write() .write()
@ -103,7 +113,11 @@ fn par_process_entries_with_scheduler(
// if any of the locks error out // if any of the locks error out
// execute the current group // execute the current group
if first_err(&lock_results).is_err() { if first_err(&lock_results).is_err() {
par_execute_entries(bank, &mt_group)?; let (res, fee) = par_execute_entries(bank, &mt_group);
fees += fee;
if res.is_err() {
return (res, fees);
}
mt_group = vec![]; mt_group = vec![];
//reset the lock and push the entry //reset the lock and push the entry
bank.unlock_accounts(&entry.transactions, &lock_results); bank.unlock_accounts(&entry.transactions, &lock_results);
@ -114,8 +128,9 @@ fn par_process_entries_with_scheduler(
mt_group.push((entry, lock_results)); mt_group.push((entry, lock_results));
} }
} }
par_execute_entries(bank, &mt_group)?; let (res, fee) = par_execute_entries(bank, &mt_group);
Ok(()) fees += fee;
(res, fees)
} }
/// Process an ordered list of entries. /// Process an ordered list of entries.
@ -123,7 +138,7 @@ pub fn process_entries(
bank: &Bank, bank: &Bank,
entries: &[Entry], entries: &[Entry],
leader_scheduler: &Arc<RwLock<LeaderScheduler>>, leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
) -> Result<()> { ) -> (Result<()>, u64) {
par_process_entries_with_scheduler(bank, entries, leader_scheduler) par_process_entries_with_scheduler(bank, entries, leader_scheduler)
} }
@ -389,7 +404,7 @@ mod tests {
fn par_process_entries(bank: &Bank, entries: &[Entry]) -> Result<()> { fn par_process_entries(bank: &Bank, entries: &[Entry]) -> Result<()> {
let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::default())); let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::default()));
par_process_entries_with_scheduler(bank, entries, &leader_scheduler) par_process_entries_with_scheduler(bank, entries, &leader_scheduler).0
} }
fn create_sample_block_with_next_entries_using_keypairs( fn create_sample_block_with_next_entries_using_keypairs(

View File

@ -65,6 +65,7 @@ impl ReplayStage {
last_entry_id: &Arc<RwLock<Hash>>, last_entry_id: &Arc<RwLock<Hash>>,
leader_scheduler: &Arc<RwLock<LeaderScheduler>>, leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
subscriptions: &Arc<RpcSubscriptions>, subscriptions: &Arc<RpcSubscriptions>,
fees: &mut u64,
) -> Result<()> { ) -> Result<()> {
// Coalesce all the available entries into a single vote // Coalesce all the available entries into a single vote
submit( submit(
@ -108,7 +109,10 @@ impl ReplayStage {
// If we don't process the entry now, the for loop will exit and the entry // If we don't process the entry now, the for loop will exit and the entry
// will be dropped. // will be dropped.
if 0 == num_ticks_to_next_vote || (i + 1) == entries.len() { if 0 == num_ticks_to_next_vote || (i + 1) == entries.len() {
res = blocktree_processor::process_entries(bank, &entries[0..=i], leader_scheduler); let (res_int, fee) =
blocktree_processor::process_entries(bank, &entries[0..=i], leader_scheduler);
res = res_int;
*fees += fee;
if res.is_err() { if res.is_err() {
// TODO: This will return early from the first entry that has an erroneous // TODO: This will return early from the first entry that has an erroneous
@ -123,6 +127,19 @@ impl ReplayStage {
if 0 == num_ticks_to_next_vote { if 0 == num_ticks_to_next_vote {
subscriptions.notify_subscribers(&bank); subscriptions.notify_subscribers(&bank);
let slot_num = leader_scheduler
.read()
.unwrap()
.tick_height_to_slot(num_ticks);
if let Some(leader) = leader_scheduler
.read()
.unwrap()
.get_leader_for_slot(slot_num)
{
// Credit the accumulated fees to the current leader and reset the fee to 0
bank.collect_tx_fee(&leader, *fees);
*fees = 0;
}
if let Some(voting_keypair) = voting_keypair { if let Some(voting_keypair) = voting_keypair {
let keypair = voting_keypair.as_ref(); let keypair = voting_keypair.as_ref();
let vote = VoteTransaction::new_vote( let vote = VoteTransaction::new_vote(
@ -207,6 +224,8 @@ impl ReplayStage {
) )
}; };
let mut fees = 0;
// Loop through blocktree MAX_ENTRY_RECV_PER_ITER entries at a time for each // Loop through blocktree MAX_ENTRY_RECV_PER_ITER entries at a time for each
// relevant slot to see if there are any available updates // relevant slot to see if there are any available updates
loop { loop {
@ -268,6 +287,7 @@ impl ReplayStage {
&last_entry_id, &last_entry_id,
&leader_scheduler_, &leader_scheduler_,
&subscriptions_, &subscriptions_,
&mut fees,
) { ) {
error!("process_entries failed: {:?}", e); error!("process_entries failed: {:?}", e);
} }
@ -756,6 +776,7 @@ mod test {
let leader_scheduler_config = LeaderSchedulerConfig::default(); let leader_scheduler_config = LeaderSchedulerConfig::default();
let leader_scheduler = LeaderScheduler::new_with_bank(&leader_scheduler_config, &bank); let leader_scheduler = LeaderScheduler::new_with_bank(&leader_scheduler_config, &bank);
let leader_scheduler = Arc::new(RwLock::new(leader_scheduler)); let leader_scheduler = Arc::new(RwLock::new(leader_scheduler));
let mut fee = 0;
let res = ReplayStage::process_entries( let res = ReplayStage::process_entries(
entries.clone(), entries.clone(),
&bank, &bank,
@ -766,6 +787,7 @@ mod test {
&Arc::new(RwLock::new(last_entry_id)), &Arc::new(RwLock::new(last_entry_id)),
&leader_scheduler, &leader_scheduler,
&Arc::new(RpcSubscriptions::default()), &Arc::new(RpcSubscriptions::default()),
&mut fee,
); );
match res { match res {
@ -792,6 +814,7 @@ mod test {
&Arc::new(RwLock::new(last_entry_id)), &Arc::new(RwLock::new(last_entry_id)),
&leader_scheduler, &leader_scheduler,
&Arc::new(RpcSubscriptions::default()), &Arc::new(RpcSubscriptions::default()),
&mut fee,
); );
match res { match res {