Hoist Slot Leader dependencies up to BankingStage

This commit is contained in:
Greg Fitzgerald 2019-02-16 15:02:21 -07:00
parent b539389741
commit e39094ac37
2 changed files with 188 additions and 189 deletions

View File

@ -7,8 +7,6 @@ use crate::accounts::{Accounts, ErrorCounters, InstructionAccounts, InstructionL
use crate::counter::Counter;
use crate::genesis_block::GenesisBlock;
use crate::last_id_queue::{LastIdQueue, MAX_ENTRY_IDS};
use crate::poh_recorder::{PohRecorder, PohRecorderError};
use crate::result::Error;
use crate::rpc_pubsub::RpcSubscriptions;
use crate::status_cache::StatusCache;
use bincode::deserialize;
@ -322,93 +320,6 @@ impl Bank {
self.accounts.unlock_accounts(txs, results)
}
pub fn process_and_record_transactions(
&self,
txs: &[Transaction],
poh: &PohRecorder,
) -> Result<()> {
let now = Instant::now();
// Once accounts are locked, other threads cannot encode transactions that will modify the
// same account state
let lock_results = self.lock_accounts(txs);
let lock_time = now.elapsed();
let now = Instant::now();
// Use a shorter maximum age when adding transactions into the pipeline. This will reduce
// the likelihood of any single thread getting starved and processing old ids.
// TODO: Banking stage threads should be prioritized to complete faster then this queue
// expires.
let (loaded_accounts, results) =
self.load_and_execute_transactions(txs, lock_results, MAX_ENTRY_IDS as usize / 2);
let load_execute_time = now.elapsed();
let record_time = {
let now = Instant::now();
self.record_transactions(txs, &results, poh)?;
now.elapsed()
};
let commit_time = {
let now = Instant::now();
self.commit_transactions(txs, &loaded_accounts, &results);
now.elapsed()
};
let now = Instant::now();
// Once the accounts are new transactions can enter the pipeline to process them
self.unlock_accounts(&txs, &results);
let unlock_time = now.elapsed();
debug!(
"lock: {}us load_execute: {}us record: {}us commit: {}us unlock: {}us txs_len: {}",
duration_as_us(&lock_time),
duration_as_us(&load_execute_time),
duration_as_us(&record_time),
duration_as_us(&commit_time),
duration_as_us(&unlock_time),
txs.len(),
);
Ok(())
}
fn record_transactions(
&self,
txs: &[Transaction],
results: &[Result<()>],
poh: &PohRecorder,
) -> Result<()> {
let processed_transactions: Vec<_> = results
.iter()
.zip(txs.iter())
.filter_map(|(r, x)| match r {
Ok(_) => Some(x.clone()),
Err(BankError::ProgramError(index, err)) => {
info!("program error {:?}, {:?}", index, err);
Some(x.clone())
}
Err(ref e) => {
debug!("process transaction failed {:?}", e);
None
}
})
.collect();
debug!("processed: {} ", processed_transactions.len());
// unlock all the accounts with errors which are filtered by the above `filter_map`
if !processed_transactions.is_empty() {
let hash = Transaction::hash(&processed_transactions);
// record and unlock will unlock all the successfull transactions
poh.record(hash, processed_transactions).map_err(|e| {
warn!("record failure: {:?}", e);
match e {
Error::PohRecorderError(PohRecorderError::MaxHeightReached) => {
BankError::MaxHeightReached
}
_ => BankError::RecordFailure,
}
})?;
}
Ok(())
}
fn load_accounts(
&self,
txs: &[Transaction],
@ -457,7 +368,7 @@ impl Bank {
.collect()
}
#[allow(clippy::type_complexity)]
fn load_and_execute_transactions(
pub fn load_and_execute_transactions(
&self,
txs: &[Transaction],
lock_results: Vec<Result<()>>,
@ -556,7 +467,7 @@ impl Bank {
(loaded_accounts, executed)
}
fn commit_transactions(
pub fn commit_transactions(
&self,
txs: &[Transaction],
loaded_accounts: &[Result<(InstructionAccounts, InstructionLoaders)>],
@ -727,7 +638,6 @@ mod tests {
use solana_sdk::system_transaction::SystemTransaction;
use solana_sdk::transaction::Instruction;
use std;
use std::sync::mpsc::channel;
#[test]
fn test_bank_new() {
@ -1090,44 +1000,6 @@ mod tests {
assert!(ids.into_iter().all(move |id| unique.insert(id)));
}
#[test]
fn test_bank_record_transactions() {
let (genesis_block, mint_keypair) = GenesisBlock::new(10_000);
let bank = Arc::new(Bank::new(&genesis_block));
let (entry_sender, entry_receiver) = channel();
let poh_recorder =
PohRecorder::new(bank.clone(), entry_sender, bank.last_id(), std::u64::MAX);
let pubkey = Keypair::new().pubkey();
let transactions = vec![
SystemTransaction::new_move(&mint_keypair, pubkey, 1, genesis_block.last_id(), 0),
SystemTransaction::new_move(&mint_keypair, pubkey, 1, genesis_block.last_id(), 0),
];
let mut results = vec![Ok(()), Ok(())];
bank.record_transactions(&transactions, &results, &poh_recorder)
.unwrap();
let entries = entry_receiver.recv().unwrap();
assert_eq!(entries[0].transactions.len(), transactions.len());
// ProgramErrors should still be recorded
results[0] = Err(BankError::ProgramError(
1,
ProgramError::ResultWithNegativeTokens,
));
bank.record_transactions(&transactions, &results, &poh_recorder)
.unwrap();
let entries = entry_receiver.recv().unwrap();
assert_eq!(entries[0].transactions.len(), transactions.len());
// Other BankErrors should not be recorded
results[0] = Err(BankError::AccountNotFound);
bank.record_transactions(&transactions, &results, &poh_recorder)
.unwrap();
let entries = entry_receiver.recv().unwrap();
assert_eq!(entries[0].transactions.len(), transactions.len() - 1);
}
#[test]
fn test_bank_storage() {
solana_logger::setup();
@ -1175,61 +1047,6 @@ mod tests {
assert_eq!(bank.get_storage_last_id(), storage_last_id);
}
#[test]
fn test_bank_process_and_record_transactions() {
let (genesis_block, mint_keypair) = GenesisBlock::new(10_000);
let bank = Arc::new(Bank::new(&genesis_block));
let pubkey = Keypair::new().pubkey();
let transactions = vec![SystemTransaction::new_move(
&mint_keypair,
pubkey,
1,
genesis_block.last_id(),
0,
)];
let (entry_sender, entry_receiver) = channel();
let mut poh_recorder = PohRecorder::new(
bank.clone(),
entry_sender,
bank.last_id(),
bank.tick_height() + 1,
);
bank.process_and_record_transactions(&transactions, &poh_recorder)
.unwrap();
poh_recorder.tick().unwrap();
let mut need_tick = true;
// read entries until I find mine, might be ticks...
while need_tick {
let entries = entry_receiver.recv().unwrap();
for entry in entries {
if !entry.is_tick() {
assert_eq!(entry.transactions.len(), transactions.len());
assert_eq!(bank.get_balance(&pubkey), 1);
} else {
need_tick = false;
}
}
}
let transactions = vec![SystemTransaction::new_move(
&mint_keypair,
pubkey,
2,
genesis_block.last_id(),
0,
)];
assert_eq!(
bank.process_and_record_transactions(&transactions, &poh_recorder),
Err(BankError::MaxHeightReached)
);
assert_eq!(bank.get_balance(&pubkey), 1);
}
#[test]
fn test_bank_pay_to_self() {
let (genesis_block, mint_keypair) = GenesisBlock::new(1);

View File

@ -2,13 +2,14 @@
//! to contruct a software pipeline. The stage uses all available CPU cores and
//! can do its processing in parallel with signature verification on the GPU.
use crate::bank::{Bank, BankError};
use crate::bank::{self, Bank, BankError};
use crate::compute_leader_confirmation_service::ComputeLeaderConfirmationService;
use crate::counter::Counter;
use crate::entry::Entry;
use crate::last_id_queue::MAX_ENTRY_IDS;
use crate::packet::Packets;
use crate::packet::SharedPackets;
use crate::poh_recorder::PohRecorder;
use crate::poh_recorder::{PohRecorder, PohRecorderError};
use crate::poh_service::{PohService, PohServiceConfig};
use crate::result::{Error, Result};
use crate::service::Service;
@ -19,6 +20,7 @@ use log::Level;
use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing;
use solana_sdk::timing::duration_as_us;
use solana_sdk::transaction::Transaction;
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError};
use std::sync::{Arc, Mutex};
@ -122,6 +124,92 @@ impl BankingStage {
.collect()
}
fn record_transactions(
txs: &[Transaction],
results: &[bank::Result<()>],
poh: &PohRecorder,
) -> bank::Result<()> {
let processed_transactions: Vec<_> = results
.iter()
.zip(txs.iter())
.filter_map(|(r, x)| match r {
Ok(_) => Some(x.clone()),
Err(BankError::ProgramError(index, err)) => {
info!("program error {:?}, {:?}", index, err);
Some(x.clone())
}
Err(ref e) => {
debug!("process transaction failed {:?}", e);
None
}
})
.collect();
debug!("processed: {} ", processed_transactions.len());
// unlock all the accounts with errors which are filtered by the above `filter_map`
if !processed_transactions.is_empty() {
let hash = Transaction::hash(&processed_transactions);
// record and unlock will unlock all the successfull transactions
poh.record(hash, processed_transactions).map_err(|e| {
warn!("record failure: {:?}", e);
match e {
Error::PohRecorderError(PohRecorderError::MaxHeightReached) => {
BankError::MaxHeightReached
}
_ => BankError::RecordFailure,
}
})?;
}
Ok(())
}
pub fn process_and_record_transactions(
bank: &Bank,
txs: &[Transaction],
poh: &PohRecorder,
) -> bank::Result<()> {
let now = Instant::now();
// Once accounts are locked, other threads cannot encode transactions that will modify the
// same account state
let lock_results = bank.lock_accounts(txs);
let lock_time = now.elapsed();
let now = Instant::now();
// Use a shorter maximum age when adding transactions into the pipeline. This will reduce
// the likelihood of any single thread getting starved and processing old ids.
// TODO: Banking stage threads should be prioritized to complete faster then this queue
// expires.
let (loaded_accounts, results) =
bank.load_and_execute_transactions(txs, lock_results, MAX_ENTRY_IDS as usize / 2);
let load_execute_time = now.elapsed();
let record_time = {
let now = Instant::now();
Self::record_transactions(txs, &results, poh)?;
now.elapsed()
};
let commit_time = {
let now = Instant::now();
bank.commit_transactions(txs, &loaded_accounts, &results);
now.elapsed()
};
let now = Instant::now();
// Once the accounts are new transactions can enter the pipeline to process them
bank.unlock_accounts(&txs, &results);
let unlock_time = now.elapsed();
debug!(
"lock: {}us load_execute: {}us record: {}us commit: {}us unlock: {}us txs_len: {}",
duration_as_us(&lock_time),
duration_as_us(&load_execute_time),
duration_as_us(&record_time),
duration_as_us(&commit_time),
duration_as_us(&unlock_time),
txs.len(),
);
Ok(())
}
/// Sends transactions to the bank.
///
/// Returns the number of transactions successfully processed by the bank, which may be less
@ -135,8 +223,11 @@ impl BankingStage {
while chunk_start != transactions.len() {
let chunk_end = chunk_start + Entry::num_will_fit(&transactions[chunk_start..]);
let result =
bank.process_and_record_transactions(&transactions[chunk_start..chunk_end], poh);
let result = Self::process_and_record_transactions(
bank,
&transactions[chunk_start..chunk_end],
poh,
);
if Err(BankError::MaxHeightReached) == result {
break;
}
@ -264,6 +355,7 @@ mod tests {
use crate::genesis_block::GenesisBlock;
use crate::leader_scheduler::DEFAULT_TICKS_PER_SLOT;
use crate::packet::to_packets;
use solana_sdk::native_program::ProgramError;
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::system_transaction::SystemTransaction;
use std::thread::sleep;
@ -497,4 +589,94 @@ mod tests {
assert_eq!(packets.read().unwrap().packets.len(), 1); // TODO: maybe compare actual packet contents too
assert_eq!(*start_index, 0);
}
#[test]
fn test_bank_record_transactions() {
let (genesis_block, mint_keypair) = GenesisBlock::new(10_000);
let bank = Arc::new(Bank::new(&genesis_block));
let (entry_sender, entry_receiver) = channel();
let poh_recorder =
PohRecorder::new(bank.clone(), entry_sender, bank.last_id(), std::u64::MAX);
let pubkey = Keypair::new().pubkey();
let transactions = vec![
SystemTransaction::new_move(&mint_keypair, pubkey, 1, genesis_block.last_id(), 0),
SystemTransaction::new_move(&mint_keypair, pubkey, 1, genesis_block.last_id(), 0),
];
let mut results = vec![Ok(()), Ok(())];
BankingStage::record_transactions(&transactions, &results, &poh_recorder).unwrap();
let entries = entry_receiver.recv().unwrap();
assert_eq!(entries[0].transactions.len(), transactions.len());
// ProgramErrors should still be recorded
results[0] = Err(BankError::ProgramError(
1,
ProgramError::ResultWithNegativeTokens,
));
BankingStage::record_transactions(&transactions, &results, &poh_recorder).unwrap();
let entries = entry_receiver.recv().unwrap();
assert_eq!(entries[0].transactions.len(), transactions.len());
// Other BankErrors should not be recorded
results[0] = Err(BankError::AccountNotFound);
BankingStage::record_transactions(&transactions, &results, &poh_recorder).unwrap();
let entries = entry_receiver.recv().unwrap();
assert_eq!(entries[0].transactions.len(), transactions.len() - 1);
}
#[test]
fn test_bank_process_and_record_transactions() {
let (genesis_block, mint_keypair) = GenesisBlock::new(10_000);
let bank = Arc::new(Bank::new(&genesis_block));
let pubkey = Keypair::new().pubkey();
let transactions = vec![SystemTransaction::new_move(
&mint_keypair,
pubkey,
1,
genesis_block.last_id(),
0,
)];
let (entry_sender, entry_receiver) = channel();
let mut poh_recorder = PohRecorder::new(
bank.clone(),
entry_sender,
bank.last_id(),
bank.tick_height() + 1,
);
BankingStage::process_and_record_transactions(&bank, &transactions, &poh_recorder).unwrap();
poh_recorder.tick().unwrap();
let mut need_tick = true;
// read entries until I find mine, might be ticks...
while need_tick {
let entries = entry_receiver.recv().unwrap();
for entry in entries {
if !entry.is_tick() {
assert_eq!(entry.transactions.len(), transactions.len());
assert_eq!(bank.get_balance(&pubkey), 1);
} else {
need_tick = false;
}
}
}
let transactions = vec![SystemTransaction::new_move(
&mint_keypair,
pubkey,
2,
genesis_block.last_id(),
0,
)];
assert_eq!(
BankingStage::process_and_record_transactions(&bank, &transactions, &poh_recorder),
Err(BankError::MaxHeightReached)
);
assert_eq!(bank.get_balance(&pubkey), 1);
}
}