Banking stage: Deserialize packets only once

Benchmarks show roughly a 6% improvement. The impact could be more
significant when transactions need to be retried a lot.

after patch:
{'name': 'banking_bench_total', 'median': '72767.43'}
{'name': 'banking_bench_tx_total', 'median': '80240.38'}
{'name': 'banking_bench_success_tx_total', 'median': '72767.43'}
test bench_banking_stage_multi_accounts
... bench:   6,137,264 ns/iter (+/- 1,364,111)
test bench_banking_stage_multi_programs
... bench:  10,086,435 ns/iter (+/- 2,921,440)

before patch:
{'name': 'banking_bench_total', 'median': '68572.26'}
{'name': 'banking_bench_tx_total', 'median': '75704.75'}
{'name': 'banking_bench_success_tx_total', 'median': '68572.26'}
test bench_banking_stage_multi_accounts
... bench:   6,521,007 ns/iter (+/- 1,926,741)
test bench_banking_stage_multi_programs
... bench:  10,526,433 ns/iter (+/- 2,736,530)
This commit is contained in:
Christian Kamm 2022-04-11 17:37:45 +02:00 committed by Trent Nelson
parent f7d557d5ae
commit 97f2eb8e65
2 changed files with 48 additions and 64 deletions

View File

@ -23,7 +23,7 @@ use {
solana_perf::{
cuda_runtime::PinnedVec,
data_budget::DataBudget,
packet::{limited_deserialize, Packet, PacketBatch, PACKETS_PER_BATCH},
packet::{Packet, PacketBatch, PACKETS_PER_BATCH},
perf_libs,
},
solana_poh::poh_recorder::{BankStart, PohRecorder, PohRecorderError, TransactionRecorder},
@ -45,13 +45,10 @@ use {
MAX_TRANSACTION_FORWARDING_DELAY_GPU,
},
feature_set,
message::Message,
pubkey::Pubkey,
saturating_add_assign,
timing::{duration_as_ms, timestamp, AtomicInterval},
transaction::{
self, AddressLoader, SanitizedTransaction, TransactionError, VersionedTransaction,
},
transaction::{self, AddressLoader, SanitizedTransaction, TransactionError},
transport::TransportError,
},
solana_transaction_status::token_balances::{
@ -558,7 +555,6 @@ impl BankingStage {
let mut reached_end_of_slot: Option<EndOfSlot> = None;
RetainMut::retain_mut(buffered_packet_batches, |deserialized_packet_batch| {
let packet_batch = &deserialized_packet_batch.packet_batch;
let original_unprocessed_indexes = deserialized_packet_batch
.unprocessed_packets
.keys()
@ -572,8 +568,7 @@ impl BankingStage {
let should_retain = if let Some(bank) = &end_of_slot.working_bank {
let new_unprocessed_indexes = Self::filter_unprocessed_packets_at_end_of_slot(
bank,
packet_batch,
&original_unprocessed_indexes,
&deserialized_packet_batch.unprocessed_packets,
my_pubkey,
end_of_slot.next_slot_leader,
banking_stage_stats,
@ -624,8 +619,7 @@ impl BankingStage {
&working_bank,
&bank_creation_time,
recorder,
packet_batch,
original_unprocessed_indexes.to_owned(),
&deserialized_packet_batch.unprocessed_packets,
transaction_status_sender.clone(),
gossip_vote_sender,
banking_stage_stats,
@ -1685,32 +1679,27 @@ impl BankingStage {
// with their packet indexes.
#[allow(clippy::needless_collect)]
fn transactions_from_packets(
packet_batch: &PacketBatch,
transaction_indexes: &[usize],
deserialized_packet_batch: &HashMap<usize, DeserializedPacket>,
feature_set: &Arc<feature_set::FeatureSet>,
votes_only: bool,
address_loader: impl AddressLoader,
) -> (Vec<SanitizedTransaction>, Vec<usize>) {
transaction_indexes
deserialized_packet_batch
.iter()
.filter_map(|tx_index| {
let p = &packet_batch.packets[*tx_index];
if votes_only && !p.meta.is_simple_vote_tx() {
.filter_map(|(&tx_index, deserialized_packet)| {
if votes_only && !deserialized_packet.is_simple_vote {
return None;
}
let tx: VersionedTransaction = limited_deserialize(&p.data[0..p.meta.size]).ok()?;
let message_bytes = DeserializedPacketBatch::packet_message(p)?;
let message_hash = Message::hash_raw_message(message_bytes);
let tx = SanitizedTransaction::try_create(
tx,
message_hash,
Some(p.meta.is_simple_vote_tx()),
deserialized_packet.versioned_transaction.clone(),
deserialized_packet.message_hash,
Some(deserialized_packet.is_simple_vote),
address_loader.clone(),
)
.ok()?;
tx.verify_precompiles(feature_set).ok()?;
Some((tx, *tx_index))
Some((tx, tx_index))
})
.unzip()
}
@ -1759,8 +1748,7 @@ impl BankingStage {
bank: &Arc<Bank>,
bank_creation_time: &Instant,
poh: &TransactionRecorder,
packet_batch: &PacketBatch,
packet_indexes: Vec<usize>,
deserialized_packet_batch: &HashMap<usize, DeserializedPacket>,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender,
banking_stage_stats: &BankingStageStats,
@ -1771,8 +1759,7 @@ impl BankingStage {
let ((transactions, transaction_to_packet_indexes), packet_conversion_time) = Measure::this(
|_| {
Self::transactions_from_packets(
packet_batch,
&packet_indexes,
deserialized_packet_batch,
&bank.feature_set,
bank.vote_only_bank(),
bank.as_ref(),
@ -1860,8 +1847,7 @@ impl BankingStage {
fn filter_unprocessed_packets_at_end_of_slot(
bank: &Arc<Bank>,
packet_batch: &PacketBatch,
transaction_indexes: &[usize],
deserialized_packet_batch: &HashMap<usize, DeserializedPacket>,
my_pubkey: &Pubkey,
next_leader: Option<Pubkey>,
banking_stage_stats: &BankingStageStats,
@ -1871,15 +1857,17 @@ impl BankingStage {
// Filtering helps if we were going to forward the packets to some other node
if let Some(leader) = next_leader {
if leader == *my_pubkey {
return transaction_indexes.to_vec();
return deserialized_packet_batch
.keys()
.cloned()
.collect::<Vec<usize>>();
}
}
let mut unprocessed_packet_conversion_time =
Measure::start("unprocessed_packet_conversion");
let (transactions, transaction_to_packet_indexes) = Self::transactions_from_packets(
packet_batch,
transaction_indexes,
deserialized_packet_batch,
&bank.feature_set,
bank.vote_only_bank(),
bank.as_ref(),
@ -2113,7 +2101,7 @@ mod tests {
get_tmp_ledger_path_auto_delete,
leader_schedule_cache::LeaderScheduleCache,
},
solana_perf::packet::{to_packet_batches, PacketFlags},
solana_perf::packet::{limited_deserialize, to_packet_batches, PacketFlags},
solana_poh::{
poh_recorder::{create_test_recorder, Record, WorkingBankEntry},
poh_service::PohService,
@ -2133,7 +2121,10 @@ mod tests {
signature::{Keypair, Signer},
system_instruction::SystemError,
system_transaction,
transaction::{MessageHash, SimpleAddressLoader, Transaction, TransactionError},
transaction::{
MessageHash, SimpleAddressLoader, Transaction, TransactionError,
VersionedTransaction,
},
},
solana_streamer::{recvmmsg::recv_mmsg, socket::SocketAddrSpace},
solana_transaction_status::{TransactionStatusMeta, VersionedTransactionWithStatusMeta},
@ -4240,7 +4231,7 @@ mod tests {
fn make_test_packets(
transactions: Vec<Transaction>,
vote_indexes: Vec<usize>,
) -> (PacketBatch, Vec<usize>) {
) -> DeserializedPacketBatch {
let capacity = transactions.len();
let mut packet_batch = PacketBatch::with_capacity(capacity);
let mut packet_indexes = Vec::with_capacity(capacity);
@ -4252,7 +4243,7 @@ mod tests {
for index in vote_indexes.iter() {
packet_batch.packets[*index].meta.flags |= PacketFlags::SIMPLE_VOTE_TX;
}
(packet_batch, packet_indexes)
DeserializedPacketBatch::new(packet_batch, packet_indexes, false)
}
#[test]
@ -4270,28 +4261,30 @@ mod tests {
&keypair,
None,
);
let sorted = |mut v: Vec<usize>| {
v.sort_unstable();
v
};
// packets with no votes
{
let vote_indexes = vec![];
let (packet_batch, packet_indexes) =
let deserialized_packet_batch =
make_test_packets(vec![transfer_tx.clone(), transfer_tx.clone()], vote_indexes);
let mut votes_only = false;
let (txs, tx_packet_index) = BankingStage::transactions_from_packets(
&packet_batch,
&packet_indexes,
&deserialized_packet_batch.unprocessed_packets,
&Arc::new(FeatureSet::default()),
votes_only,
SimpleAddressLoader::Disabled,
);
assert_eq!(2, txs.len());
assert_eq!(vec![0, 1], tx_packet_index);
assert_eq!(vec![0, 1], sorted(tx_packet_index));
votes_only = true;
let (txs, tx_packet_index) = BankingStage::transactions_from_packets(
&packet_batch,
&packet_indexes,
&deserialized_packet_batch.unprocessed_packets,
&Arc::new(FeatureSet::default()),
votes_only,
SimpleAddressLoader::Disabled,
@ -4303,63 +4296,59 @@ mod tests {
// packets with some votes
{
let vote_indexes = vec![0, 2];
let (packet_batch, packet_indexes) = make_test_packets(
let deserialized_packet_batch = make_test_packets(
vec![vote_tx.clone(), transfer_tx, vote_tx.clone()],
vote_indexes,
);
let mut votes_only = false;
let (txs, tx_packet_index) = BankingStage::transactions_from_packets(
&packet_batch,
&packet_indexes,
&deserialized_packet_batch.unprocessed_packets,
&Arc::new(FeatureSet::default()),
votes_only,
SimpleAddressLoader::Disabled,
);
assert_eq!(3, txs.len());
assert_eq!(vec![0, 1, 2], tx_packet_index);
assert_eq!(vec![0, 1, 2], sorted(tx_packet_index));
votes_only = true;
let (txs, tx_packet_index) = BankingStage::transactions_from_packets(
&packet_batch,
&packet_indexes,
&deserialized_packet_batch.unprocessed_packets,
&Arc::new(FeatureSet::default()),
votes_only,
SimpleAddressLoader::Disabled,
);
assert_eq!(2, txs.len());
assert_eq!(vec![0, 2], tx_packet_index);
assert_eq!(vec![0, 2], sorted(tx_packet_index));
}
// packets with all votes
{
let vote_indexes = vec![0, 1, 2];
let (packet_batch, packet_indexes) = make_test_packets(
let deserialized_packet_batch = make_test_packets(
vec![vote_tx.clone(), vote_tx.clone(), vote_tx],
vote_indexes,
);
let mut votes_only = false;
let (txs, tx_packet_index) = BankingStage::transactions_from_packets(
&packet_batch,
&packet_indexes,
&deserialized_packet_batch.unprocessed_packets,
&Arc::new(FeatureSet::default()),
votes_only,
SimpleAddressLoader::Disabled,
);
assert_eq!(3, txs.len());
assert_eq!(vec![0, 1, 2], tx_packet_index);
assert_eq!(vec![0, 1, 2], sorted(tx_packet_index));
votes_only = true;
let (txs, tx_packet_index) = BankingStage::transactions_from_packets(
&packet_batch,
&packet_indexes,
&deserialized_packet_batch.unprocessed_packets,
&Arc::new(FeatureSet::default()),
votes_only,
SimpleAddressLoader::Disabled,
);
assert_eq!(3, txs.len());
assert_eq!(vec![0, 1, 2], tx_packet_index);
assert_eq!(vec![0, 1, 2], sorted(tx_packet_index));
}
}

View File

@ -15,14 +15,9 @@ use {
/// SanitizedTransaction
#[derive(Debug, Default)]
pub struct DeserializedPacket {
#[allow(dead_code)]
versioned_transaction: VersionedTransaction,
#[allow(dead_code)]
message_hash: Hash,
#[allow(dead_code)]
is_simple_vote: bool,
pub versioned_transaction: VersionedTransaction,
pub message_hash: Hash,
pub is_simple_vote: bool,
}
/// Defines the type of entry in `UnprocessedPacketBatches`, it holds original packet_batch