Feature/multi-iterator-scanner-read-locks (#28862)
This commit is contained in:
parent
9327658007
commit
bdd162492c
|
@ -651,7 +651,7 @@ impl BankingStage {
|
|||
|
||||
// Clear payload for next iteration
|
||||
payload.sanitized_transactions.clear();
|
||||
payload.write_accounts.clear();
|
||||
payload.account_locks.clear();
|
||||
|
||||
let ProcessTransactionsSummary {
|
||||
reached_max_poh_height,
|
||||
|
|
|
@ -47,6 +47,7 @@ pub mod poh_timing_report_service;
|
|||
pub mod poh_timing_reporter;
|
||||
pub mod progress_map;
|
||||
pub mod qos_service;
|
||||
pub mod read_write_account_set;
|
||||
pub mod repair_generic_traversal;
|
||||
pub mod repair_response;
|
||||
pub mod repair_service;
|
||||
|
|
|
@ -0,0 +1,354 @@
|
|||
use {
|
||||
solana_sdk::{
|
||||
message::{SanitizedMessage, VersionedMessage},
|
||||
pubkey::Pubkey,
|
||||
},
|
||||
std::collections::HashSet,
|
||||
};
|
||||
|
||||
/// Wrapper struct to check account locks for a batch of transactions.
|
||||
#[derive(Debug, Default)]
|
||||
pub struct ReadWriteAccountSet {
|
||||
/// Set of accounts that are locked for read
|
||||
read_set: HashSet<Pubkey>,
|
||||
/// Set of accounts that are locked for write
|
||||
write_set: HashSet<Pubkey>,
|
||||
}
|
||||
|
||||
impl ReadWriteAccountSet {
|
||||
/// Check static account locks for a transaction message.
|
||||
pub fn check_static_account_locks(&self, message: &VersionedMessage) -> bool {
|
||||
!message
|
||||
.static_account_keys()
|
||||
.iter()
|
||||
.enumerate()
|
||||
.any(|(index, pubkey)| {
|
||||
if message.is_maybe_writable(index) {
|
||||
!self.can_write(pubkey)
|
||||
} else {
|
||||
!self.can_read(pubkey)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Check all account locks and if they are available, lock them.
|
||||
/// Returns true if all account locks are available and false otherwise.
|
||||
pub fn try_locking(&mut self, message: &SanitizedMessage) -> bool {
|
||||
if self.check_sanitized_message_account_locks(message) {
|
||||
self.add_sanitized_message_account_locks(message);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
/// Clears the read and write sets
|
||||
pub fn clear(&mut self) {
|
||||
self.read_set.clear();
|
||||
self.write_set.clear();
|
||||
}
|
||||
|
||||
/// Check if a sanitized message's account locks are available.
|
||||
fn check_sanitized_message_account_locks(&self, message: &SanitizedMessage) -> bool {
|
||||
!message
|
||||
.account_keys()
|
||||
.iter()
|
||||
.enumerate()
|
||||
.any(|(index, pubkey)| {
|
||||
if message.is_writable(index) {
|
||||
!self.can_write(pubkey)
|
||||
} else {
|
||||
!self.can_read(pubkey)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Insert the read and write locks for a sanitized message.
|
||||
fn add_sanitized_message_account_locks(&mut self, message: &SanitizedMessage) {
|
||||
message
|
||||
.account_keys()
|
||||
.iter()
|
||||
.enumerate()
|
||||
.for_each(|(index, pubkey)| {
|
||||
if message.is_writable(index) {
|
||||
self.add_write(pubkey);
|
||||
} else {
|
||||
self.add_read(pubkey);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// Check if an account can be read-locked
|
||||
fn can_read(&self, pubkey: &Pubkey) -> bool {
|
||||
!self.write_set.contains(pubkey)
|
||||
}
|
||||
|
||||
/// Check if an account can be write-locked
|
||||
fn can_write(&self, pubkey: &Pubkey) -> bool {
|
||||
!self.write_set.contains(pubkey) && !self.read_set.contains(pubkey)
|
||||
}
|
||||
|
||||
/// Add an account to the read-set.
|
||||
/// Should only be called after `can_read()` returns true
|
||||
fn add_read(&mut self, pubkey: &Pubkey) {
|
||||
self.read_set.insert(*pubkey);
|
||||
}
|
||||
|
||||
/// Add an account to the write-set.
|
||||
/// Should only be called after `can_write()` returns true
|
||||
fn add_write(&mut self, pubkey: &Pubkey) {
|
||||
self.write_set.insert(*pubkey);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use {
|
||||
super::ReadWriteAccountSet,
|
||||
solana_address_lookup_table_program::state::{AddressLookupTable, LookupTableMeta},
|
||||
solana_ledger::genesis_utils::GenesisConfigInfo,
|
||||
solana_runtime::{bank::Bank, genesis_utils::create_genesis_config},
|
||||
solana_sdk::{
|
||||
account::AccountSharedData,
|
||||
hash::Hash,
|
||||
message::{
|
||||
v0::{self, MessageAddressTableLookup},
|
||||
MessageHeader, VersionedMessage,
|
||||
},
|
||||
pubkey::Pubkey,
|
||||
signature::Keypair,
|
||||
signer::Signer,
|
||||
transaction::{MessageHash, SanitizedTransaction, VersionedTransaction},
|
||||
},
|
||||
std::{borrow::Cow, sync::Arc},
|
||||
};
|
||||
|
||||
fn create_test_versioned_message(
|
||||
write_keys: &[Pubkey],
|
||||
read_keys: &[Pubkey],
|
||||
address_table_lookups: Vec<MessageAddressTableLookup>,
|
||||
) -> VersionedMessage {
|
||||
VersionedMessage::V0(v0::Message {
|
||||
header: MessageHeader {
|
||||
num_required_signatures: write_keys.len() as u8,
|
||||
num_readonly_signed_accounts: 0,
|
||||
num_readonly_unsigned_accounts: read_keys.len() as u8,
|
||||
},
|
||||
recent_blockhash: Hash::default(),
|
||||
account_keys: write_keys.iter().chain(read_keys.iter()).copied().collect(),
|
||||
address_table_lookups,
|
||||
instructions: vec![],
|
||||
})
|
||||
}
|
||||
|
||||
fn create_test_sanitized_transaction(
|
||||
write_keypair: &Keypair,
|
||||
read_keys: &[Pubkey],
|
||||
address_table_lookups: Vec<MessageAddressTableLookup>,
|
||||
bank: &Bank,
|
||||
) -> SanitizedTransaction {
|
||||
let message = create_test_versioned_message(
|
||||
&[write_keypair.pubkey()],
|
||||
read_keys,
|
||||
address_table_lookups,
|
||||
);
|
||||
SanitizedTransaction::try_create(
|
||||
VersionedTransaction::try_new(message, &[write_keypair]).unwrap(),
|
||||
MessageHash::Compute,
|
||||
Some(false),
|
||||
bank,
|
||||
true, // require_static_program_ids
|
||||
)
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
fn create_test_address_lookup_table(
|
||||
bank: Arc<Bank>,
|
||||
num_addresses: usize,
|
||||
) -> (Arc<Bank>, Pubkey) {
|
||||
let mut addresses = Vec::with_capacity(num_addresses);
|
||||
addresses.resize_with(num_addresses, Pubkey::new_unique);
|
||||
let address_lookup_table = AddressLookupTable {
|
||||
meta: LookupTableMeta {
|
||||
authority: None,
|
||||
..LookupTableMeta::default()
|
||||
},
|
||||
addresses: Cow::Owned(addresses),
|
||||
};
|
||||
|
||||
let address_table_key = Pubkey::new_unique();
|
||||
let data = address_lookup_table.serialize_for_tests().unwrap();
|
||||
let mut account =
|
||||
AccountSharedData::new(1, data.len(), &solana_address_lookup_table_program::id());
|
||||
account.set_data(data);
|
||||
bank.store_account(&address_table_key, &account);
|
||||
|
||||
(
|
||||
Arc::new(Bank::new_from_parent(
|
||||
&bank,
|
||||
&Pubkey::new_unique(),
|
||||
bank.slot() + 1,
|
||||
)),
|
||||
address_table_key,
|
||||
)
|
||||
}
|
||||
|
||||
fn create_test_bank() -> Arc<Bank> {
|
||||
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
|
||||
Arc::new(Bank::new_no_wallclock_throttle_for_tests(&genesis_config))
|
||||
}
|
||||
|
||||
// Helper function (could potentially use test_case in future).
|
||||
// conflict_index = 0 means write lock conflict
|
||||
// conflict_index = 1 means read lock conflict
|
||||
fn test_check_static_account_locks(conflict_index: usize, add_write: bool, expectation: bool) {
|
||||
let message =
|
||||
create_test_versioned_message(&[Pubkey::new_unique()], &[Pubkey::new_unique()], vec![]);
|
||||
|
||||
let mut account_locks = ReadWriteAccountSet::default();
|
||||
assert!(account_locks.check_static_account_locks(&message));
|
||||
|
||||
let conflict_key = message.static_account_keys().get(conflict_index).unwrap();
|
||||
if add_write {
|
||||
account_locks.add_write(conflict_key);
|
||||
} else {
|
||||
account_locks.add_read(conflict_key);
|
||||
}
|
||||
assert_eq!(
|
||||
expectation,
|
||||
account_locks.check_static_account_locks(&message)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_check_static_account_locks_write_write_conflict() {
|
||||
test_check_static_account_locks(0, true, false);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_check_static_account_locks_read_write_conflict() {
|
||||
test_check_static_account_locks(0, false, false);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_check_static_account_locks_write_read_conflict() {
|
||||
test_check_static_account_locks(1, true, false);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_check_static_account_locks_read_read_non_conflict() {
|
||||
test_check_static_account_locks(1, false, true);
|
||||
}
|
||||
|
||||
// Helper function (could potentially use test_case in future).
|
||||
// conflict_index = 0 means write lock conflict with static key
|
||||
// conflict_index = 1 means read lock conflict with static key
|
||||
// conflict_index = 2 means write lock conflict with address table key
|
||||
// conflict_index = 3 means read lock conflict with address table key
|
||||
fn test_check_sanitized_message_account_locks(
|
||||
conflict_index: usize,
|
||||
add_write: bool,
|
||||
expectation: bool,
|
||||
) {
|
||||
let bank = create_test_bank();
|
||||
let (bank, table_address) = create_test_address_lookup_table(bank, 2);
|
||||
let tx = create_test_sanitized_transaction(
|
||||
&Keypair::new(),
|
||||
&[Pubkey::new_unique()],
|
||||
vec![MessageAddressTableLookup {
|
||||
account_key: table_address,
|
||||
writable_indexes: vec![0],
|
||||
readonly_indexes: vec![1],
|
||||
}],
|
||||
&bank,
|
||||
);
|
||||
let message = tx.message();
|
||||
|
||||
let mut account_locks = ReadWriteAccountSet::default();
|
||||
assert!(account_locks.check_sanitized_message_account_locks(message));
|
||||
|
||||
let conflict_key = message.account_keys().get(conflict_index).unwrap();
|
||||
if add_write {
|
||||
account_locks.add_write(conflict_key);
|
||||
} else {
|
||||
account_locks.add_read(conflict_key);
|
||||
}
|
||||
assert_eq!(
|
||||
expectation,
|
||||
account_locks.check_sanitized_message_account_locks(message)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_check_sanitized_message_account_locks_write_write_conflict() {
|
||||
test_check_sanitized_message_account_locks(0, true, false); // static key conflict
|
||||
test_check_sanitized_message_account_locks(2, true, false); // lookup key conflict
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_check_sanitized_message_account_locks_read_write_conflict() {
|
||||
test_check_sanitized_message_account_locks(0, false, false); // static key conflict
|
||||
test_check_sanitized_message_account_locks(2, false, false); // lookup key conflict
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_check_sanitized_message_account_locks_write_read_conflict() {
|
||||
test_check_sanitized_message_account_locks(1, true, false); // static key conflict
|
||||
test_check_sanitized_message_account_locks(3, true, false); // lookup key conflict
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_check_sanitized_message_account_locks_read_read_non_conflict() {
|
||||
test_check_sanitized_message_account_locks(1, false, true); // static key conflict
|
||||
test_check_sanitized_message_account_locks(3, false, true); // lookup key conflict
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_write_write_conflict() {
|
||||
let mut account_locks = ReadWriteAccountSet::default();
|
||||
let account = Pubkey::new_unique();
|
||||
assert!(account_locks.can_write(&account));
|
||||
account_locks.add_write(&account);
|
||||
assert!(!account_locks.can_write(&account));
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_read_write_conflict() {
|
||||
let mut account_locks = ReadWriteAccountSet::default();
|
||||
let account = Pubkey::new_unique();
|
||||
assert!(account_locks.can_read(&account));
|
||||
account_locks.add_read(&account);
|
||||
assert!(!account_locks.can_write(&account));
|
||||
assert!(account_locks.can_read(&account));
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_write_read_conflict() {
|
||||
let mut account_locks = ReadWriteAccountSet::default();
|
||||
let account = Pubkey::new_unique();
|
||||
assert!(account_locks.can_write(&account));
|
||||
account_locks.add_write(&account);
|
||||
assert!(!account_locks.can_write(&account));
|
||||
assert!(!account_locks.can_read(&account));
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_read_read_non_conflict() {
|
||||
let mut account_locks = ReadWriteAccountSet::default();
|
||||
let account = Pubkey::new_unique();
|
||||
assert!(account_locks.can_read(&account));
|
||||
account_locks.add_read(&account);
|
||||
assert!(account_locks.can_read(&account));
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_write_write_different_keys() {
|
||||
let mut account_locks = ReadWriteAccountSet::default();
|
||||
let account1 = Pubkey::new_unique();
|
||||
let account2 = Pubkey::new_unique();
|
||||
assert!(account_locks.can_write(&account1));
|
||||
account_locks.add_write(&account1);
|
||||
assert!(account_locks.can_write(&account2));
|
||||
assert!(account_locks.can_read(&account2));
|
||||
}
|
||||
}
|
|
@ -9,6 +9,7 @@ use {
|
|||
},
|
||||
leader_slot_banking_stage_metrics::LeaderSlotMetricsTracker,
|
||||
multi_iterator_scanner::{MultiIteratorScanner, ProcessingDecision},
|
||||
read_write_account_set::ReadWriteAccountSet,
|
||||
unprocessed_packet_batches::{
|
||||
DeserializedPacket, PacketBatchInsertionMetrics, UnprocessedPacketBatches,
|
||||
},
|
||||
|
@ -18,13 +19,10 @@ use {
|
|||
solana_measure::measure,
|
||||
solana_runtime::bank::Bank,
|
||||
solana_sdk::{
|
||||
clock::FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, pubkey::Pubkey,
|
||||
saturating_add_assign, transaction::SanitizedTransaction,
|
||||
},
|
||||
std::{
|
||||
collections::HashSet,
|
||||
sync::{atomic::Ordering, Arc},
|
||||
clock::FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, saturating_add_assign,
|
||||
transaction::SanitizedTransaction,
|
||||
},
|
||||
std::sync::{atomic::Ordering, Arc},
|
||||
};
|
||||
|
||||
// Step-size set to be 64, equal to the maximum batch/entry size. With the
|
||||
|
@ -131,7 +129,7 @@ fn filter_processed_packets<'a, F>(
|
|||
/// multi-iterator checking function.
|
||||
pub struct ConsumeScannerPayload<'a> {
|
||||
pub reached_end_of_slot: bool,
|
||||
pub write_accounts: HashSet<Pubkey>,
|
||||
pub account_locks: ReadWriteAccountSet,
|
||||
pub sanitized_transactions: Vec<SanitizedTransaction>,
|
||||
pub slot_metrics_tracker: &'a mut LeaderSlotMetricsTracker,
|
||||
}
|
||||
|
@ -149,18 +147,9 @@ fn consume_scan_should_process_packet(
|
|||
|
||||
// Before sanitization, let's quickly check the static keys (performance optimization)
|
||||
let message = &packet.transaction().get_message().message;
|
||||
let static_keys = message.static_account_keys();
|
||||
for key in static_keys.iter().enumerate().filter_map(|(idx, key)| {
|
||||
if message.is_maybe_writable(idx) {
|
||||
Some(key)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}) {
|
||||
if payload.write_accounts.contains(key) {
|
||||
if !payload.account_locks.check_static_account_locks(message) {
|
||||
return ProcessingDecision::Later;
|
||||
}
|
||||
}
|
||||
|
||||
// Try to deserialize the packet
|
||||
let (maybe_sanitized_transaction, sanitization_time) = measure!(
|
||||
|
@ -178,29 +167,19 @@ fn consume_scan_should_process_packet(
|
|||
if let Some(sanitized_transaction) = maybe_sanitized_transaction {
|
||||
let message = sanitized_transaction.message();
|
||||
|
||||
let conflicts_with_batch = message.account_keys().iter().enumerate().any(|(idx, key)| {
|
||||
if message.is_writable(idx) {
|
||||
payload.write_accounts.contains(key)
|
||||
} else {
|
||||
false
|
||||
}
|
||||
});
|
||||
|
||||
if conflicts_with_batch {
|
||||
ProcessingDecision::Later
|
||||
} else {
|
||||
message
|
||||
.account_keys()
|
||||
.iter()
|
||||
.enumerate()
|
||||
.for_each(|(idx, key)| {
|
||||
if message.is_writable(idx) {
|
||||
payload.write_accounts.insert(*key);
|
||||
}
|
||||
});
|
||||
|
||||
// Check the number of locks and whether there are duplicates
|
||||
if SanitizedTransaction::validate_account_locks(
|
||||
message,
|
||||
bank.get_transaction_account_lock_limit(),
|
||||
)
|
||||
.is_err()
|
||||
{
|
||||
ProcessingDecision::Never
|
||||
} else if payload.account_locks.try_locking(message) {
|
||||
payload.sanitized_transactions.push(sanitized_transaction);
|
||||
ProcessingDecision::Now
|
||||
} else {
|
||||
ProcessingDecision::Later
|
||||
}
|
||||
} else {
|
||||
ProcessingDecision::Never
|
||||
|
@ -221,7 +200,7 @@ where
|
|||
{
|
||||
let payload = ConsumeScannerPayload {
|
||||
reached_end_of_slot: false,
|
||||
write_accounts: HashSet::new(),
|
||||
account_locks: ReadWriteAccountSet::default(),
|
||||
sanitized_transactions: Vec::with_capacity(UNPROCESSED_BUFFER_STEP_SIZE),
|
||||
slot_metrics_tracker,
|
||||
};
|
||||
|
|
|
@ -277,7 +277,7 @@ impl SanitizedTransaction {
|
|||
}
|
||||
|
||||
/// Validate a transaction message against locked accounts
|
||||
fn validate_account_locks(
|
||||
pub fn validate_account_locks(
|
||||
message: &SanitizedMessage,
|
||||
tx_account_lock_limit: usize,
|
||||
) -> Result<()> {
|
||||
|
|
Loading…
Reference in New Issue