Bugfix: MultiIterator batch priority guard (#33454)

This commit is contained in:
Andrew Fitzgerald 2023-10-05 09:20:24 -07:00 committed by GitHub
parent 9c2663f7a5
commit 2a17be0eea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 180 additions and 143 deletions

View File

@ -757,13 +757,17 @@ mod tests {
self,
state::{AddressLookupTable, LookupTableMeta},
},
compute_budget,
instruction::InstructionError,
message::{v0, v0::MessageAddressTableLookup, MessageHeader, VersionedMessage},
message::{
v0::{self, MessageAddressTableLookup},
Message, MessageHeader, VersionedMessage,
},
poh_config::PohConfig,
pubkey::Pubkey,
signature::Keypair,
signer::Signer,
system_transaction,
system_instruction, system_transaction,
transaction::{MessageHash, Transaction, VersionedTransaction},
},
solana_transaction_status::{TransactionStatusMeta, VersionedTransactionWithStatusMeta},
@ -862,10 +866,11 @@ mod tests {
Arc<Bank>,
Arc<RwLock<PohRecorder>>,
Receiver<WorkingBankEntry>,
GenesisConfigInfo,
JoinHandle<()>,
) {
Blockstore::destroy(ledger_path).unwrap();
let genesis_config_info = create_slow_genesis_config(10_000);
let genesis_config_info = create_slow_genesis_config(100_000_000);
let GenesisConfigInfo {
genesis_config,
mint_keypair,
@ -905,6 +910,7 @@ mod tests {
bank,
poh_recorder,
entry_receiver,
genesis_config_info,
poh_simulator,
)
}
@ -1830,9 +1836,9 @@ mod tests {
fn test_consume_buffered_packets() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
{
let (transactions, bank, poh_recorder, _entry_receiver, poh_simulator) =
let (transactions, bank, poh_recorder, _entry_receiver, _, poh_simulator) =
setup_conflicting_transactions(ledger_path.path());
let recorder = poh_recorder.read().unwrap().new_recorder();
let recorder: TransactionRecorder = poh_recorder.read().unwrap().new_recorder();
let num_conflicting_transactions = transactions.len();
let deserialized_packets = transactions_to_deserialized_packets(&transactions).unwrap();
assert_eq!(deserialized_packets.len(), num_conflicting_transactions);
@ -1903,7 +1909,7 @@ mod tests {
fn test_consume_buffered_packets_sanitization_error() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
{
let (mut transactions, bank, poh_recorder, _entry_receiver, poh_simulator) =
let (mut transactions, bank, poh_recorder, _entry_receiver, _, poh_simulator) =
setup_conflicting_transactions(ledger_path.path());
let duplicate_account_key = transactions[0].message.account_keys[0];
transactions[0]
@ -1959,7 +1965,7 @@ mod tests {
fn test_consume_buffered_packets_retryable() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
{
let (transactions, bank, poh_recorder, _entry_receiver, poh_simulator) =
let (transactions, bank, poh_recorder, _entry_receiver, _, poh_simulator) =
setup_conflicting_transactions(ledger_path.path());
let recorder = poh_recorder.read().unwrap().new_recorder();
let num_conflicting_transactions = transactions.len();
@ -2048,6 +2054,109 @@ mod tests {
Blockstore::destroy(ledger_path.path()).unwrap();
}
#[test]
fn test_consume_buffered_packets_batch_priority_guard() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
{
let (_, bank, poh_recorder, _entry_receiver, genesis_config_info, poh_simulator) =
setup_conflicting_transactions(ledger_path.path());
let recorder = poh_recorder.read().unwrap().new_recorder();
// Setup transactions:
// [(AB), (BC), (CD)]
// (AB) and (BC) are conflicting, and cannot go into the same batch.
// (AB) and (CD) are not conflict. However, (CD) should not be able to take locks needed by (BC).
let keypair_a = Keypair::new();
let keypair_b = Keypair::new();
let keypair_c = Keypair::new();
let keypair_d = Keypair::new();
for keypair in &[&keypair_a, &keypair_b, &keypair_c, &keypair_d] {
bank.transfer(5_000, &genesis_config_info.mint_keypair, &keypair.pubkey())
.unwrap();
}
let make_prioritized_transfer =
|from: &Keypair, to, lamports, priority| -> Transaction {
let ixs = vec![
system_instruction::transfer(&from.pubkey(), to, lamports),
compute_budget::ComputeBudgetInstruction::set_compute_unit_price(priority),
];
let message = Message::new(&ixs, Some(&from.pubkey()));
Transaction::new(&[from], message, bank.last_blockhash())
};
let transactions = vec![
make_prioritized_transfer(&keypair_a, &keypair_b.pubkey(), 1, 3),
make_prioritized_transfer(&keypair_b, &keypair_c.pubkey(), 1, 2),
make_prioritized_transfer(&keypair_c, &keypair_d.pubkey(), 1, 1),
];
let num_conflicting_transactions = transactions.len();
let deserialized_packets = transactions_to_deserialized_packets(&transactions).unwrap();
assert_eq!(deserialized_packets.len(), num_conflicting_transactions);
let mut buffered_packet_batches =
UnprocessedTransactionStorage::new_transaction_storage(
UnprocessedPacketBatches::from_iter(
deserialized_packets,
num_conflicting_transactions,
),
ThreadType::Transactions,
);
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let committer = Committer::new(
None,
replay_vote_sender,
Arc::new(PrioritizationFeeCache::new(0u64)),
);
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
// When the working bank in poh_recorder is None, no packets should be processed (consume will not be called)
assert!(!poh_recorder.read().unwrap().has_bank());
assert_eq!(buffered_packet_batches.len(), num_conflicting_transactions);
// When the working bank in poh_recorder is Some, all packets should be processed.
// Multi-Iterator will process them 1-by-1 if all txs are conflicting.
poh_recorder.write().unwrap().set_bank(bank, false);
let bank_start = poh_recorder.read().unwrap().bank_start().unwrap();
let banking_stage_stats = BankingStageStats::default();
consumer.consume_buffered_packets(
&bank_start,
&mut buffered_packet_batches,
&banking_stage_stats,
&mut LeaderSlotMetricsTracker::new(0),
);
// Check that all packets were processed without retrying
assert!(buffered_packet_batches.is_empty());
assert_eq!(
banking_stage_stats
.consumed_buffered_packets_count
.load(Ordering::Relaxed),
num_conflicting_transactions
);
assert_eq!(
banking_stage_stats
.rebuffered_packets_count
.load(Ordering::Relaxed),
0
);
// Use bank to check the number of entries (batches)
assert_eq!(bank_start.working_bank.transactions_per_entry_max(), 1);
assert_eq!(
bank_start.working_bank.transaction_entries_count(),
4 + num_conflicting_transactions as u64 // 4 for funding transfers
);
poh_recorder
.read()
.unwrap()
.is_exited
.store(true, Ordering::Relaxed);
let _ = poh_simulator.join();
}
Blockstore::destroy(ledger_path.path()).unwrap();
}
#[test]
fn test_accumulate_execute_units_and_time() {
let mut execute_timings = ExecuteTimings::default();

View File

@ -1,12 +1,9 @@
use {
solana_sdk::{
message::{SanitizedMessage, VersionedMessage},
pubkey::Pubkey,
},
solana_sdk::{message::SanitizedMessage, pubkey::Pubkey},
std::collections::HashSet,
};
/// Wrapper struct to check account locks for a batch of transactions.
/// Wrapper struct to accumulate locks for a batch of transactions.
#[derive(Debug, Default)]
pub struct ReadWriteAccountSet {
/// Set of accounts that are locked for read
@ -16,30 +13,36 @@ pub struct ReadWriteAccountSet {
}
impl ReadWriteAccountSet {
/// Check static account locks for a transaction message.
pub fn check_static_account_locks(&self, message: &VersionedMessage) -> bool {
!message
.static_account_keys()
/// Returns true if all account locks were available and false otherwise.
#[allow(dead_code)]
pub fn check_locks(&self, message: &SanitizedMessage) -> bool {
message
.account_keys()
.iter()
.enumerate()
.any(|(index, pubkey)| {
if message.is_maybe_writable(index) {
!self.can_write(pubkey)
.all(|(index, pubkey)| {
if message.is_writable(index) {
self.can_write(pubkey)
} else {
!self.can_read(pubkey)
self.can_read(pubkey)
}
})
}
/// Check all account locks and if they are available, lock them.
/// Returns true if all account locks are available and false otherwise.
pub fn try_locking(&mut self, message: &SanitizedMessage) -> bool {
if self.check_sanitized_message_account_locks(message) {
self.add_sanitized_message_account_locks(message);
true
/// Add all account locks.
/// Returns true if all account locks were available and false otherwise.
pub fn take_locks(&mut self, message: &SanitizedMessage) -> bool {
message
.account_keys()
.iter()
.enumerate()
.fold(true, |all_available, (index, pubkey)| {
if message.is_writable(index) {
all_available & self.add_write(pubkey)
} else {
false
all_available & self.add_read(pubkey)
}
})
}
/// Clears the read and write sets
@ -48,36 +51,6 @@ impl ReadWriteAccountSet {
self.write_set.clear();
}
/// Check if a sanitized message's account locks are available.
fn check_sanitized_message_account_locks(&self, message: &SanitizedMessage) -> bool {
!message
.account_keys()
.iter()
.enumerate()
.any(|(index, pubkey)| {
if message.is_writable(index) {
!self.can_write(pubkey)
} else {
!self.can_read(pubkey)
}
})
}
/// Insert the read and write locks for a sanitized message.
fn add_sanitized_message_account_locks(&mut self, message: &SanitizedMessage) {
message
.account_keys()
.iter()
.enumerate()
.for_each(|(index, pubkey)| {
if message.is_writable(index) {
self.add_write(pubkey);
} else {
self.add_read(pubkey);
}
});
}
/// Check if an account can be read-locked
fn can_read(&self, pubkey: &Pubkey) -> bool {
!self.write_set.contains(pubkey)
@ -89,15 +62,21 @@ impl ReadWriteAccountSet {
}
/// Add an account to the read-set.
/// Should only be called after `can_read()` returns true
fn add_read(&mut self, pubkey: &Pubkey) {
/// Returns true if the lock was available.
fn add_read(&mut self, pubkey: &Pubkey) -> bool {
let can_read = self.can_read(pubkey);
self.read_set.insert(*pubkey);
can_read
}
/// Add an account to the write-set.
/// Should only be called after `can_write()` returns true
fn add_write(&mut self, pubkey: &Pubkey) {
/// Returns true if the lock was available.
fn add_write(&mut self, pubkey: &Pubkey) -> bool {
let can_write = self.can_write(pubkey);
self.write_set.insert(*pubkey);
can_write
}
}
@ -197,58 +176,12 @@ mod tests {
Arc::new(Bank::new_no_wallclock_throttle_for_tests(&genesis_config))
}
// Helper function (could potentially use test_case in future).
// conflict_index = 0 means write lock conflict
// conflict_index = 1 means read lock conflict
fn test_check_static_account_locks(conflict_index: usize, add_write: bool, expectation: bool) {
let message =
create_test_versioned_message(&[Pubkey::new_unique()], &[Pubkey::new_unique()], vec![]);
let mut account_locks = ReadWriteAccountSet::default();
assert!(account_locks.check_static_account_locks(&message));
let conflict_key = message.static_account_keys().get(conflict_index).unwrap();
if add_write {
account_locks.add_write(conflict_key);
} else {
account_locks.add_read(conflict_key);
}
assert_eq!(
expectation,
account_locks.check_static_account_locks(&message)
);
}
#[test]
fn test_check_static_account_locks_write_write_conflict() {
test_check_static_account_locks(0, true, false);
}
#[test]
fn test_check_static_account_locks_read_write_conflict() {
test_check_static_account_locks(0, false, false);
}
#[test]
fn test_check_static_account_locks_write_read_conflict() {
test_check_static_account_locks(1, true, false);
}
#[test]
fn test_check_static_account_locks_read_read_non_conflict() {
test_check_static_account_locks(1, false, true);
}
// Helper function (could potentially use test_case in future).
// conflict_index = 0 means write lock conflict with static key
// conflict_index = 1 means read lock conflict with static key
// conflict_index = 2 means write lock conflict with address table key
// conflict_index = 3 means read lock conflict with address table key
fn test_check_sanitized_message_account_locks(
conflict_index: usize,
add_write: bool,
expectation: bool,
) {
fn test_check_and_take_locks(conflict_index: usize, add_write: bool, expectation: bool) {
let bank = create_test_bank();
let (bank, table_address) = create_test_address_lookup_table(bank, 2);
let tx = create_test_sanitized_transaction(
@ -264,7 +197,6 @@ mod tests {
let message = tx.message();
let mut account_locks = ReadWriteAccountSet::default();
assert!(account_locks.check_sanitized_message_account_locks(message));
let conflict_key = message.account_keys().get(conflict_index).unwrap();
if add_write {
@ -272,34 +204,32 @@ mod tests {
} else {
account_locks.add_read(conflict_key);
}
assert_eq!(
expectation,
account_locks.check_sanitized_message_account_locks(message)
);
assert_eq!(expectation, account_locks.check_locks(message));
assert_eq!(expectation, account_locks.take_locks(message));
}
#[test]
fn test_check_sanitized_message_account_locks_write_write_conflict() {
test_check_sanitized_message_account_locks(0, true, false); // static key conflict
test_check_sanitized_message_account_locks(2, true, false); // lookup key conflict
fn test_check_and_take_locks_write_write_conflict() {
test_check_and_take_locks(0, true, false); // static key conflict
test_check_and_take_locks(2, true, false); // lookup key conflict
}
#[test]
fn test_check_sanitized_message_account_locks_read_write_conflict() {
test_check_sanitized_message_account_locks(0, false, false); // static key conflict
test_check_sanitized_message_account_locks(2, false, false); // lookup key conflict
fn test_check_and_take_locks_read_write_conflict() {
test_check_and_take_locks(0, false, false); // static key conflict
test_check_and_take_locks(2, false, false); // lookup key conflict
}
#[test]
fn test_check_sanitized_message_account_locks_write_read_conflict() {
test_check_sanitized_message_account_locks(1, true, false); // static key conflict
test_check_sanitized_message_account_locks(3, true, false); // lookup key conflict
fn test_check_and_take_locks_write_read_conflict() {
test_check_and_take_locks(1, true, false); // static key conflict
test_check_and_take_locks(3, true, false); // lookup key conflict
}
#[test]
fn test_check_sanitized_message_account_locks_read_read_non_conflict() {
test_check_sanitized_message_account_locks(1, false, true); // static key conflict
test_check_sanitized_message_account_locks(3, false, true); // lookup key conflict
fn test_check_and_take_locks_read_read_non_conflict() {
test_check_and_take_locks(1, false, true); // static key conflict
test_check_and_take_locks(3, false, true); // lookup key conflict
}
#[test]

View File

@ -16,7 +16,7 @@ use {
},
itertools::Itertools,
min_max_heap::MinMaxHeap,
solana_measure::measure,
solana_measure::{measure, measure_us},
solana_runtime::bank::Bank,
solana_sdk::{
clock::FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, feature_set::FeatureSet, hash::Hash,
@ -149,18 +149,11 @@ fn consume_scan_should_process_packet(
return ProcessingDecision::Now;
}
// Before sanitization, let's quickly check the static keys (performance optimization)
let message = &packet.transaction().get_message().message;
if !payload.account_locks.check_static_account_locks(message) {
return ProcessingDecision::Later;
}
// Try to deserialize the packet
let (maybe_sanitized_transaction, sanitization_time) = measure!(
// Try to sanitize the packet
let (maybe_sanitized_transaction, sanitization_time_us) = measure_us!(
packet.build_sanitized_transaction(&bank.feature_set, bank.vote_only_bank(), bank)
);
let sanitization_time_us = sanitization_time.as_us();
payload
.slot_metrics_tracker
.increment_transactions_from_packets_us(sanitization_time_us);
@ -181,13 +174,18 @@ fn consume_scan_should_process_packet(
payload
.message_hash_to_transaction
.remove(packet.message_hash());
ProcessingDecision::Never
} else if payload.account_locks.try_locking(message) {
return ProcessingDecision::Never;
}
// Always take locks during batch creation.
// This prevents lower-priority transactions from taking locks
// needed by higher-priority txs that were skipped by this check.
if !payload.account_locks.take_locks(message) {
return ProcessingDecision::Later;
}
payload.sanitized_transactions.push(sanitized_transaction);
ProcessingDecision::Now
} else {
ProcessingDecision::Later
}
} else {
payload
.message_hash_to_transaction