Replace txn_signature in ReplicaAccountInfo with transaction (#30189)

#### Current implementation of ReplicaAccountInfo restricts
possibilities for inflight account filtration
Current implementation includes transaction signature in
ReplicaAccountInfo. This approach does not allow to filter accounts by
matching other accounts in transaction in-flight (e. g. accept only
those accounts included in transactions for specific programs). Current
implementation forces to collect ALL accounts and transactions for some
period of time and perform such complex filtration afterwards.

#### Pass reference to transaction object instead of transaction
signature into ReplicaAccountInfo
Advanced in-flight filtration can be implemented in plugins by passing
reference to transaction for every update_account event. This change
doesn't bring any overhead comparing to current implementation (only
data type of reference is changed) and brings only minor changes in
source code.
This commit is contained in:
ivandzen 2023-02-10 00:00:33 +04:00 committed by GitHub
parent da34f2edd0
commit 2f9146e8c8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 89 additions and 64 deletions

View File

@ -71,6 +71,39 @@ pub struct ReplicaAccountInfoV2<'a> {
pub txn_signature: Option<&'a Signature>, pub txn_signature: Option<&'a Signature>,
} }
#[derive(Debug, Clone, PartialEq, Eq)]
/// Information about an account being updated
/// (extended with reference to transaction doing this update)
pub struct ReplicaAccountInfoV3<'a> {
/// The Pubkey for the account
pub pubkey: &'a [u8],
/// The lamports for the account
pub lamports: u64,
/// The Pubkey of the owner program account
pub owner: &'a [u8],
/// This account's data contains a loaded program (and is now read-only)
pub executable: bool,
/// The epoch at which this account will next owe rent
pub rent_epoch: u64,
/// The data held in this account.
pub data: &'a [u8],
/// A global monotonically increasing atomic number, which can be used
/// to tell the order of the account update. For example, when an
/// account is updated in the same slot multiple times, the update
/// with higher write_version should supersede the one with lower
/// write_version.
pub write_version: u64,
/// Reference to transaction causing this account modification
pub txn: Option<&'a SanitizedTransaction>,
}
/// A wrapper to future-proof ReplicaAccountInfo handling. /// A wrapper to future-proof ReplicaAccountInfo handling.
/// If there were a change to the structure of ReplicaAccountInfo, /// If there were a change to the structure of ReplicaAccountInfo,
/// there would be new enum entry for the newer version, forcing /// there would be new enum entry for the newer version, forcing
@ -78,6 +111,7 @@ pub struct ReplicaAccountInfoV2<'a> {
pub enum ReplicaAccountInfoVersions<'a> { pub enum ReplicaAccountInfoVersions<'a> {
V0_0_1(&'a ReplicaAccountInfo<'a>), V0_0_1(&'a ReplicaAccountInfo<'a>),
V0_0_2(&'a ReplicaAccountInfoV2<'a>), V0_0_2(&'a ReplicaAccountInfoV2<'a>),
V0_0_3(&'a ReplicaAccountInfoV3<'a>),
} }
/// Information about a transaction /// Information about a transaction

View File

@ -3,7 +3,7 @@ use {
crate::geyser_plugin_manager::GeyserPluginManager, crate::geyser_plugin_manager::GeyserPluginManager,
log::*, log::*,
solana_geyser_plugin_interface::geyser_plugin_interface::{ solana_geyser_plugin_interface::geyser_plugin_interface::{
ReplicaAccountInfoV2, ReplicaAccountInfoVersions, ReplicaAccountInfoV3, ReplicaAccountInfoVersions,
}, },
solana_measure::measure::Measure, solana_measure::measure::Measure,
solana_metrics::*, solana_metrics::*,
@ -15,7 +15,7 @@ use {
account::{AccountSharedData, ReadableAccount}, account::{AccountSharedData, ReadableAccount},
clock::Slot, clock::Slot,
pubkey::Pubkey, pubkey::Pubkey,
signature::Signature, transaction::SanitizedTransaction,
}, },
std::sync::{Arc, RwLock}, std::sync::{Arc, RwLock},
}; };
@ -29,12 +29,12 @@ impl AccountsUpdateNotifierInterface for AccountsUpdateNotifierImpl {
&self, &self,
slot: Slot, slot: Slot,
account: &AccountSharedData, account: &AccountSharedData,
txn_signature: &Option<&Signature>, txn: &Option<&SanitizedTransaction>,
pubkey: &Pubkey, pubkey: &Pubkey,
write_version: u64, write_version: u64,
) { ) {
if let Some(account_info) = if let Some(account_info) =
self.accountinfo_from_shared_account_data(account, txn_signature, pubkey, write_version) self.accountinfo_from_shared_account_data(account, txn, pubkey, write_version)
{ {
self.notify_plugins_of_account_update(account_info, slot, false); self.notify_plugins_of_account_update(account_info, slot, false);
} }
@ -107,11 +107,11 @@ impl AccountsUpdateNotifierImpl {
fn accountinfo_from_shared_account_data<'a>( fn accountinfo_from_shared_account_data<'a>(
&self, &self,
account: &'a AccountSharedData, account: &'a AccountSharedData,
txn_signature: &'a Option<&'a Signature>, txn: &'a Option<&'a SanitizedTransaction>,
pubkey: &'a Pubkey, pubkey: &'a Pubkey,
write_version: u64, write_version: u64,
) -> Option<ReplicaAccountInfoV2<'a>> { ) -> Option<ReplicaAccountInfoV3<'a>> {
Some(ReplicaAccountInfoV2 { Some(ReplicaAccountInfoV3 {
pubkey: pubkey.as_ref(), pubkey: pubkey.as_ref(),
lamports: account.lamports(), lamports: account.lamports(),
owner: account.owner().as_ref(), owner: account.owner().as_ref(),
@ -119,15 +119,15 @@ impl AccountsUpdateNotifierImpl {
rent_epoch: account.rent_epoch(), rent_epoch: account.rent_epoch(),
data: account.data(), data: account.data(),
write_version, write_version,
txn_signature: *txn_signature, txn: *txn,
}) })
} }
fn accountinfo_from_stored_account_meta<'a>( fn accountinfo_from_stored_account_meta<'a>(
&self, &self,
stored_account_meta: &'a StoredAccountMeta, stored_account_meta: &'a StoredAccountMeta,
) -> Option<ReplicaAccountInfoV2<'a>> { ) -> Option<ReplicaAccountInfoV3<'a>> {
Some(ReplicaAccountInfoV2 { Some(ReplicaAccountInfoV3 {
pubkey: stored_account_meta.pubkey().as_ref(), pubkey: stored_account_meta.pubkey().as_ref(),
lamports: stored_account_meta.account_meta.lamports, lamports: stored_account_meta.account_meta.lamports,
owner: stored_account_meta.account_meta.owner.as_ref(), owner: stored_account_meta.account_meta.owner.as_ref(),
@ -135,13 +135,13 @@ impl AccountsUpdateNotifierImpl {
rent_epoch: stored_account_meta.account_meta.rent_epoch, rent_epoch: stored_account_meta.account_meta.rent_epoch,
data: stored_account_meta.data, data: stored_account_meta.data,
write_version: stored_account_meta.meta.write_version_obsolete, write_version: stored_account_meta.meta.write_version_obsolete,
txn_signature: None, txn: None,
}) })
} }
fn notify_plugins_of_account_update( fn notify_plugins_of_account_update(
&self, &self,
account: ReplicaAccountInfoV2, account: ReplicaAccountInfoV3,
slot: Slot, slot: Slot,
is_startup: bool, is_startup: bool,
) { ) {
@ -154,7 +154,7 @@ impl AccountsUpdateNotifierImpl {
for plugin in plugin_manager.plugins.iter_mut() { for plugin in plugin_manager.plugins.iter_mut() {
let mut measure = Measure::start("geyser-plugin-update-account"); let mut measure = Measure::start("geyser-plugin-update-account");
match plugin.update_account( match plugin.update_account(
ReplicaAccountInfoVersions::V0_0_2(&account), ReplicaAccountInfoVersions::V0_0_3(&account),
slot, slot,
is_startup, is_startup,
) { ) {

View File

@ -47,7 +47,6 @@ use {
}, },
pubkey::Pubkey, pubkey::Pubkey,
saturating_add_assign, saturating_add_assign,
signature::Signature,
slot_hashes::SlotHashes, slot_hashes::SlotHashes,
system_program, system_program,
sysvar::{self, epoch_schedule::EpochSchedule, instructions::construct_instructions_data}, sysvar::{self, epoch_schedule::EpochSchedule, instructions::construct_instructions_data},
@ -1203,7 +1202,7 @@ impl Accounts {
lamports_per_signature: u64, lamports_per_signature: u64,
include_slot_in_hash: IncludeSlotInHash, include_slot_in_hash: IncludeSlotInHash,
) { ) {
let (accounts_to_store, txn_signatures) = self.collect_accounts_to_store( let (accounts_to_store, transactions) = self.collect_accounts_to_store(
txs, txs,
res, res,
loaded, loaded,
@ -1213,7 +1212,7 @@ impl Accounts {
); );
self.accounts_db.store_cached( self.accounts_db.store_cached(
(slot, &accounts_to_store[..], include_slot_in_hash), (slot, &accounts_to_store[..], include_slot_in_hash),
Some(&txn_signatures), Some(&transactions),
); );
} }
@ -1240,10 +1239,10 @@ impl Accounts {
lamports_per_signature: u64, lamports_per_signature: u64,
) -> ( ) -> (
Vec<(&'a Pubkey, &'a AccountSharedData)>, Vec<(&'a Pubkey, &'a AccountSharedData)>,
Vec<Option<&'a Signature>>, Vec<Option<&'a SanitizedTransaction>>,
) { ) {
let mut accounts = Vec::with_capacity(load_results.len()); let mut accounts = Vec::with_capacity(load_results.len());
let mut signatures = Vec::with_capacity(load_results.len()); let mut transactions = Vec::with_capacity(load_results.len());
for (i, ((tx_load_result, nonce), tx)) in load_results.iter_mut().zip(txs).enumerate() { for (i, ((tx_load_result, nonce), tx)) in load_results.iter_mut().zip(txs).enumerate() {
if tx_load_result.is_err() { if tx_load_result.is_err() {
// Don't store any accounts if tx failed to load // Don't store any accounts if tx failed to load
@ -1293,12 +1292,12 @@ impl Accounts {
if execution_status.is_ok() || is_nonce_account || is_fee_payer { if execution_status.is_ok() || is_nonce_account || is_fee_payer {
// Add to the accounts to store // Add to the accounts to store
accounts.push((&*address, &*account)); accounts.push((&*address, &*account));
signatures.push(Some(tx.signature())); transactions.push(Some(tx));
} }
} }
} }
} }
(accounts, signatures) (accounts, transactions)
} }
} }
@ -2874,7 +2873,6 @@ mod tests {
(message.account_keys[1], account2.clone()), (message.account_keys[1], account2.clone()),
]; ];
let tx0 = new_sanitized_tx(&[&keypair0], message, Hash::default()); let tx0 = new_sanitized_tx(&[&keypair0], message, Hash::default());
let tx0_sign = *tx0.signature();
let instructions = vec![CompiledInstruction::new(2, &(), vec![0, 1])]; let instructions = vec![CompiledInstruction::new(2, &(), vec![0, 1])];
let message = Message::new_with_compiled_instructions( let message = Message::new_with_compiled_instructions(
@ -2890,7 +2888,6 @@ mod tests {
(message.account_keys[1], account2), (message.account_keys[1], account2),
]; ];
let tx1 = new_sanitized_tx(&[&keypair1], message, Hash::default()); let tx1 = new_sanitized_tx(&[&keypair1], message, Hash::default());
let tx1_sign = *tx1.signature();
let loaded0 = ( let loaded0 = (
Ok(LoadedTransaction { Ok(LoadedTransaction {
@ -2927,9 +2924,9 @@ mod tests {
.unwrap() .unwrap()
.insert_new_readonly(&pubkey); .insert_new_readonly(&pubkey);
} }
let txs = vec![tx0, tx1]; let txs = vec![tx0.clone(), tx1.clone()];
let execution_results = vec![new_execution_result(Ok(()), None); 2]; let execution_results = vec![new_execution_result(Ok(()), None); 2];
let (collected_accounts, txn_signatures) = accounts.collect_accounts_to_store( let (collected_accounts, transactions) = accounts.collect_accounts_to_store(
&txs, &txs,
&execution_results, &execution_results,
loaded.as_mut_slice(), loaded.as_mut_slice(),
@ -2945,13 +2942,9 @@ mod tests {
.iter() .iter()
.any(|(pubkey, _account)| *pubkey == &keypair1.pubkey())); .any(|(pubkey, _account)| *pubkey == &keypair1.pubkey()));
assert_eq!(txn_signatures.len(), 2); assert_eq!(transactions.len(), 2);
assert!(txn_signatures assert!(transactions.iter().any(|txn| txn.unwrap().eq(&tx0)));
.iter() assert!(transactions.iter().any(|txn| txn.unwrap().eq(&tx1)));
.any(|signature| signature.unwrap().to_string().eq(&tx0_sign.to_string())));
assert!(txn_signatures
.iter()
.any(|signature| signature.unwrap().to_string().eq(&tx1_sign.to_string())));
// Ensure readonly_lock reflects lock // Ensure readonly_lock reflects lock
assert_eq!( assert_eq!(

View File

@ -79,8 +79,8 @@ use {
pubkey::Pubkey, pubkey::Pubkey,
rent::Rent, rent::Rent,
saturating_add_assign, saturating_add_assign,
signature::Signature,
timing::AtomicInterval, timing::AtomicInterval,
transaction::SanitizedTransaction,
}, },
std::{ std::{
borrow::{Borrow, Cow}, borrow::{Borrow, Cow},
@ -6571,16 +6571,16 @@ impl AccountsDb {
&self, &self,
slot: Slot, slot: Slot,
accounts_and_meta_to_store: &impl StorableAccounts<'b, T>, accounts_and_meta_to_store: &impl StorableAccounts<'b, T>,
txn_signatures_iter: Box<dyn std::iter::Iterator<Item = &Option<&Signature>> + 'a>, txn_iter: Box<dyn std::iter::Iterator<Item = &Option<&SanitizedTransaction>> + 'a>,
include_slot_in_hash: IncludeSlotInHash, include_slot_in_hash: IncludeSlotInHash,
mut write_version_producer: P, mut write_version_producer: P,
) -> Vec<AccountInfo> ) -> Vec<AccountInfo>
where where
P: Iterator<Item = u64>, P: Iterator<Item = u64>,
{ {
txn_signatures_iter txn_iter
.enumerate() .enumerate()
.map(|(i, signature)| { .map(|(i, txn)| {
let account = accounts_and_meta_to_store let account = accounts_and_meta_to_store
.account_default_if_zero_lamport(i) .account_default_if_zero_lamport(i)
.map(|account| account.to_account_shared_data()) .map(|account| account.to_account_shared_data())
@ -6594,7 +6594,7 @@ impl AccountsDb {
self.notify_account_at_accounts_update( self.notify_account_at_accounts_update(
slot, slot,
&account, &account,
signature, txn,
accounts_and_meta_to_store.pubkey(i), accounts_and_meta_to_store.pubkey(i),
&mut write_version_producer, &mut write_version_producer,
); );
@ -6630,7 +6630,7 @@ impl AccountsDb {
hashes: Option<Vec<impl Borrow<Hash>>>, hashes: Option<Vec<impl Borrow<Hash>>>,
mut write_version_producer: P, mut write_version_producer: P,
store_to: &StoreTo, store_to: &StoreTo,
txn_signatures: Option<&[Option<&'a Signature>]>, transactions: Option<&[Option<&'a SanitizedTransaction>]>,
) -> Vec<AccountInfo> { ) -> Vec<AccountInfo> {
let mut calc_stored_meta_time = Measure::start("calc_stored_meta"); let mut calc_stored_meta_time = Measure::start("calc_stored_meta");
let slot = accounts.target_slot(); let slot = accounts.target_slot();
@ -6645,11 +6645,11 @@ impl AccountsDb {
match store_to { match store_to {
StoreTo::Cache => { StoreTo::Cache => {
let signature_iter: Box<dyn std::iter::Iterator<Item = &Option<&Signature>>> = let txn_iter: Box<dyn std::iter::Iterator<Item = &Option<&SanitizedTransaction>>> =
match txn_signatures { match transactions {
Some(txn_signatures) => { Some(transactions) => {
assert_eq!(txn_signatures.len(), accounts.len()); assert_eq!(transactions.len(), accounts.len());
Box::new(txn_signatures.iter()) Box::new(transactions.iter())
} }
None => Box::new(std::iter::repeat(&None).take(accounts.len())), None => Box::new(std::iter::repeat(&None).take(accounts.len())),
}; };
@ -6657,7 +6657,7 @@ impl AccountsDb {
self.write_accounts_to_cache( self.write_accounts_to_cache(
slot, slot,
accounts, accounts,
signature_iter, txn_iter,
accounts.include_slot_in_hash(), accounts.include_slot_in_hash(),
write_version_producer, write_version_producer,
) )
@ -8161,12 +8161,12 @@ impl AccountsDb {
pub fn store_cached<'a, T: ReadableAccount + Sync + ZeroLamport + 'a>( pub fn store_cached<'a, T: ReadableAccount + Sync + ZeroLamport + 'a>(
&self, &self,
accounts: impl StorableAccounts<'a, T>, accounts: impl StorableAccounts<'a, T>,
txn_signatures: Option<&'a [Option<&'a Signature>]>, transactions: Option<&'a [Option<&'a SanitizedTransaction>]>,
) { ) {
self.store( self.store(
accounts, accounts,
&StoreTo::Cache, &StoreTo::Cache,
txn_signatures, transactions,
StoreReclaims::Default, StoreReclaims::Default,
); );
} }
@ -8187,7 +8187,7 @@ impl AccountsDb {
&self, &self,
accounts: impl StorableAccounts<'a, T>, accounts: impl StorableAccounts<'a, T>,
store_to: &StoreTo, store_to: &StoreTo,
txn_signatures: Option<&'a [Option<&'a Signature>]>, transactions: Option<&'a [Option<&'a SanitizedTransaction>]>,
reclaim: StoreReclaims, reclaim: StoreReclaims,
) { ) {
// If all transactions in a batch are errored, // If all transactions in a batch are errored,
@ -8219,13 +8219,7 @@ impl AccountsDb {
} }
// we use default hashes for now since the same account may be stored to the cache multiple times // we use default hashes for now since the same account may be stored to the cache multiple times
self.store_accounts_unfrozen( self.store_accounts_unfrozen(accounts, None::<Vec<Hash>>, store_to, transactions, reclaim);
accounts,
None::<Vec<Hash>>,
store_to,
txn_signatures,
reclaim,
);
self.report_store_timings(); self.report_store_timings();
} }
@ -8352,7 +8346,7 @@ impl AccountsDb {
accounts: impl StorableAccounts<'a, T>, accounts: impl StorableAccounts<'a, T>,
hashes: Option<Vec<impl Borrow<Hash>>>, hashes: Option<Vec<impl Borrow<Hash>>>,
store_to: &StoreTo, store_to: &StoreTo,
txn_signatures: Option<&'a [Option<&'a Signature>]>, transactions: Option<&'a [Option<&'a SanitizedTransaction>]>,
reclaim: StoreReclaims, reclaim: StoreReclaims,
) { ) {
// This path comes from a store to a non-frozen slot. // This path comes from a store to a non-frozen slot.
@ -8369,7 +8363,7 @@ impl AccountsDb {
None::<Box<dyn Iterator<Item = u64>>>, None::<Box<dyn Iterator<Item = u64>>>,
store_to, store_to,
reset_accounts, reset_accounts,
txn_signatures, transactions,
reclaim, reclaim,
); );
} }
@ -8404,7 +8398,7 @@ impl AccountsDb {
write_version_producer: Option<Box<dyn Iterator<Item = u64>>>, write_version_producer: Option<Box<dyn Iterator<Item = u64>>>,
store_to: &StoreTo, store_to: &StoreTo,
reset_accounts: bool, reset_accounts: bool,
txn_signatures: Option<&[Option<&Signature>]>, transactions: Option<&[Option<&SanitizedTransaction>]>,
reclaim: StoreReclaims, reclaim: StoreReclaims,
) -> StoreAccountsTiming { ) -> StoreAccountsTiming {
let write_version_producer: Box<dyn Iterator<Item = u64>> = write_version_producer let write_version_producer: Box<dyn Iterator<Item = u64>> = write_version_producer
@ -8426,7 +8420,7 @@ impl AccountsDb {
hashes, hashes,
write_version_producer, write_version_producer,
store_to, store_to,
txn_signatures, transactions,
); );
store_accounts_time.stop(); store_accounts_time.stop();
self.stats self.stats

View File

@ -5,7 +5,9 @@ use {
}, },
solana_measure::measure::Measure, solana_measure::measure::Measure,
solana_metrics::*, solana_metrics::*,
solana_sdk::{account::AccountSharedData, clock::Slot, pubkey::Pubkey, signature::Signature}, solana_sdk::{
account::AccountSharedData, clock::Slot, pubkey::Pubkey, transaction::SanitizedTransaction,
},
std::collections::{hash_map::Entry, HashMap, HashSet}, std::collections::{hash_map::Entry, HashMap, HashSet},
}; };
@ -63,7 +65,7 @@ impl AccountsDb {
&self, &self,
slot: Slot, slot: Slot,
account: &AccountSharedData, account: &AccountSharedData,
txn_signature: &Option<&Signature>, txn: &Option<&SanitizedTransaction>,
pubkey: &Pubkey, pubkey: &Pubkey,
write_version_producer: &mut P, write_version_producer: &mut P,
) where ) where
@ -74,7 +76,7 @@ impl AccountsDb {
notifier.notify_account_update( notifier.notify_account_update(
slot, slot,
account, account,
txn_signature, txn,
pubkey, pubkey,
write_version_producer.next().unwrap(), write_version_producer.next().unwrap(),
); );
@ -177,7 +179,7 @@ pub mod tests {
account::{AccountSharedData, ReadableAccount}, account::{AccountSharedData, ReadableAccount},
clock::Slot, clock::Slot,
pubkey::Pubkey, pubkey::Pubkey,
signature::Signature, transaction::SanitizedTransaction,
}, },
std::sync::{ std::sync::{
atomic::{AtomicBool, Ordering}, atomic::{AtomicBool, Ordering},
@ -203,7 +205,7 @@ pub mod tests {
&self, &self,
slot: Slot, slot: Slot,
account: &AccountSharedData, account: &AccountSharedData,
_txn_signature: &Option<&Signature>, _txn: &Option<&SanitizedTransaction>,
pubkey: &Pubkey, pubkey: &Pubkey,
_write_version: u64, _write_version: u64,
) { ) {

View File

@ -1,6 +1,8 @@
use { use {
crate::append_vec::StoredAccountMeta, crate::append_vec::StoredAccountMeta,
solana_sdk::{account::AccountSharedData, clock::Slot, pubkey::Pubkey, signature::Signature}, solana_sdk::{
account::AccountSharedData, clock::Slot, pubkey::Pubkey, transaction::SanitizedTransaction,
},
std::sync::{Arc, RwLock}, std::sync::{Arc, RwLock},
}; };
@ -10,7 +12,7 @@ pub trait AccountsUpdateNotifierInterface: std::fmt::Debug {
&self, &self,
slot: Slot, slot: Slot,
account: &AccountSharedData, account: &AccountSharedData,
txn_signature: &Option<&Signature>, txn: &Option<&SanitizedTransaction>,
pubkey: &Pubkey, pubkey: &Pubkey,
write_version: u64, write_version: u64,
); );