handle write_version correctly in geyser api (#29224)

This commit is contained in:
Jeff Washington (jwash) 2022-12-13 08:42:28 -06:00 committed by GitHub
parent 637d0358ed
commit a57247a78e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 47 additions and 31 deletions

View File

@ -9,11 +9,12 @@ use {
solana_metrics::*, solana_metrics::*,
solana_runtime::{ solana_runtime::{
accounts_update_notifier_interface::AccountsUpdateNotifierInterface, accounts_update_notifier_interface::AccountsUpdateNotifierInterface,
append_vec::{StoredAccountMeta, StoredMeta}, append_vec::StoredAccountMeta,
}, },
solana_sdk::{ solana_sdk::{
account::{AccountSharedData, ReadableAccount}, account::{AccountSharedData, ReadableAccount},
clock::Slot, clock::Slot,
pubkey::Pubkey,
signature::Signature, signature::Signature,
}, },
std::sync::{Arc, RwLock}, std::sync::{Arc, RwLock},
@ -27,12 +28,13 @@ impl AccountsUpdateNotifierInterface for AccountsUpdateNotifierImpl {
fn notify_account_update( fn notify_account_update(
&self, &self,
slot: Slot, slot: Slot,
meta: &StoredMeta,
account: &AccountSharedData, account: &AccountSharedData,
txn_signature: &Option<&Signature>, txn_signature: &Option<&Signature>,
pubkey: &Pubkey,
write_version: u64,
) { ) {
if let Some(account_info) = if let Some(account_info) =
self.accountinfo_from_shared_account_data(meta, account, txn_signature) self.accountinfo_from_shared_account_data(account, txn_signature, pubkey, write_version)
{ {
self.notify_plugins_of_account_update(account_info, slot, false); self.notify_plugins_of_account_update(account_info, slot, false);
} }
@ -104,18 +106,19 @@ impl AccountsUpdateNotifierImpl {
fn accountinfo_from_shared_account_data<'a>( fn accountinfo_from_shared_account_data<'a>(
&self, &self,
meta: &'a StoredMeta,
account: &'a AccountSharedData, account: &'a AccountSharedData,
txn_signature: &'a Option<&'a Signature>, txn_signature: &'a Option<&'a Signature>,
pubkey: &'a Pubkey,
write_version: u64,
) -> Option<ReplicaAccountInfoV2<'a>> { ) -> Option<ReplicaAccountInfoV2<'a>> {
Some(ReplicaAccountInfoV2 { Some(ReplicaAccountInfoV2 {
pubkey: meta.pubkey.as_ref(), pubkey: pubkey.as_ref(),
lamports: account.lamports(), lamports: account.lamports(),
owner: account.owner().as_ref(), owner: account.owner().as_ref(),
executable: account.executable(), executable: account.executable(),
rent_epoch: account.rent_epoch(), rent_epoch: account.rent_epoch(),
data: account.data(), data: account.data(),
write_version: meta.write_version, write_version,
txn_signature: *txn_signature, txn_signature: *txn_signature,
}) })
} }

View File

@ -42,7 +42,7 @@ use {
}, },
append_vec::{ append_vec::{
aligned_stored_size, AppendVec, StorableAccountsWithHashesAndWriteVersions, aligned_stored_size, AppendVec, StorableAccountsWithHashesAndWriteVersions,
StoredAccountMeta, StoredMeta, StoredMetaWriteVersion, APPEND_VEC_MMAPPED_FILES_OPEN, StoredAccountMeta, StoredMetaWriteVersion, APPEND_VEC_MMAPPED_FILES_OPEN,
STORE_META_OVERHEAD, STORE_META_OVERHEAD,
}, },
cache_hash_data::{CacheHashData, CacheHashDataFile}, cache_hash_data::{CacheHashData, CacheHashDataFile},
@ -6614,13 +6614,17 @@ impl AccountsDb {
} }
} }
fn write_accounts_to_cache<'a, 'b, T: ReadableAccount + Sync>( fn write_accounts_to_cache<'a, 'b, T: ReadableAccount + Sync, P>(
&self, &self,
slot: Slot, slot: Slot,
accounts_and_meta_to_store: &impl StorableAccounts<'b, T>, accounts_and_meta_to_store: &impl StorableAccounts<'b, T>,
txn_signatures_iter: Box<dyn std::iter::Iterator<Item = &Option<&Signature>> + 'a>, txn_signatures_iter: Box<dyn std::iter::Iterator<Item = &Option<&Signature>> + 'a>,
include_slot_in_hash: IncludeSlotInHash, include_slot_in_hash: IncludeSlotInHash,
) -> Vec<AccountInfo> { mut write_version_producer: P,
) -> Vec<AccountInfo>
where
P: Iterator<Item = u64>,
{
txn_signatures_iter txn_signatures_iter
.enumerate() .enumerate()
.map(|(i, signature)| { .map(|(i, signature)| {
@ -6634,13 +6638,13 @@ impl AccountsDb {
account.lamports(), account.lamports(),
); );
let meta = StoredMeta { self.notify_account_at_accounts_update(
pubkey: *accounts_and_meta_to_store.pubkey(i), slot,
data_len: account.data().len() as u64, &account,
write_version: 0, signature,
}; accounts_and_meta_to_store.pubkey(i),
&mut write_version_producer,
self.notify_account_at_accounts_update(slot, &meta, &account, signature); );
let cached_account = self.accounts_cache.store( let cached_account = self.accounts_cache.store(
slot, slot,
@ -6703,6 +6707,7 @@ impl AccountsDb {
accounts, accounts,
signature_iter, signature_iter,
accounts.include_slot_in_hash(), accounts.include_slot_in_hash(),
write_version_producer,
) )
} else if accounts.has_hash_and_write_version() { } else if accounts.has_hash_and_write_version() {
self.write_accounts_to_storage( self.write_accounts_to_storage(
@ -9657,7 +9662,7 @@ pub mod tests {
accounts_index::{ accounts_index::{
tests::*, AccountSecondaryIndexesIncludeExclude, ReadAccountMapEntry, RefCount, tests::*, AccountSecondaryIndexesIncludeExclude, ReadAccountMapEntry, RefCount,
}, },
append_vec::{test_utils::TempFile, AccountMeta}, append_vec::{test_utils::TempFile, AccountMeta, StoredMeta},
cache_hash_data_stats::CacheHashDataStats, cache_hash_data_stats::CacheHashDataStats,
inline_spl_token, inline_spl_token,
}, },

View File

@ -1,8 +1,5 @@
use { use {
crate::{ crate::{accounts_db::AccountsDb, append_vec::StoredAccountMeta},
accounts_db::AccountsDb,
append_vec::{StoredAccountMeta, StoredMeta},
},
solana_measure::measure::Measure, solana_measure::measure::Measure,
solana_metrics::*, solana_metrics::*,
solana_sdk::{account::AccountSharedData, clock::Slot, pubkey::Pubkey, signature::Signature}, solana_sdk::{account::AccountSharedData, clock::Slot, pubkey::Pubkey, signature::Signature},
@ -59,16 +56,25 @@ impl AccountsDb {
notify_stats.report(); notify_stats.report();
} }
pub fn notify_account_at_accounts_update( pub fn notify_account_at_accounts_update<P>(
&self, &self,
slot: Slot, slot: Slot,
meta: &StoredMeta,
account: &AccountSharedData, account: &AccountSharedData,
txn_signature: &Option<&Signature>, txn_signature: &Option<&Signature>,
) { pubkey: &Pubkey,
write_version_producer: &mut P,
) where
P: Iterator<Item = u64>,
{
if let Some(accounts_update_notifier) = &self.accounts_update_notifier { if let Some(accounts_update_notifier) = &self.accounts_update_notifier {
let notifier = &accounts_update_notifier.read().unwrap(); let notifier = &accounts_update_notifier.read().unwrap();
notifier.notify_account_update(slot, meta, account, txn_signature); notifier.notify_account_update(
slot,
account,
txn_signature,
pubkey,
write_version_producer.next().unwrap(),
);
} }
} }
@ -154,7 +160,7 @@ pub mod tests {
accounts_update_notifier_interface::{ accounts_update_notifier_interface::{
AccountsUpdateNotifier, AccountsUpdateNotifierInterface, AccountsUpdateNotifier, AccountsUpdateNotifierInterface,
}, },
append_vec::{StoredAccountMeta, StoredMeta}, append_vec::StoredAccountMeta,
}, },
dashmap::DashMap, dashmap::DashMap,
solana_sdk::{ solana_sdk::{
@ -186,12 +192,13 @@ pub mod tests {
fn notify_account_update( fn notify_account_update(
&self, &self,
slot: Slot, slot: Slot,
meta: &StoredMeta,
account: &AccountSharedData, account: &AccountSharedData,
_txn_signature: &Option<&Signature>, _txn_signature: &Option<&Signature>,
pubkey: &Pubkey,
_write_version: u64,
) { ) {
self.accounts_notified self.accounts_notified
.entry(meta.pubkey) .entry(*pubkey)
.or_default() .or_default()
.push((slot, account.clone())); .push((slot, account.clone()));
} }

View File

@ -1,6 +1,6 @@
use { use {
crate::append_vec::{StoredAccountMeta, StoredMeta}, crate::append_vec::StoredAccountMeta,
solana_sdk::{account::AccountSharedData, clock::Slot, signature::Signature}, solana_sdk::{account::AccountSharedData, clock::Slot, pubkey::Pubkey, signature::Signature},
std::sync::{Arc, RwLock}, std::sync::{Arc, RwLock},
}; };
@ -9,9 +9,10 @@ pub trait AccountsUpdateNotifierInterface: std::fmt::Debug {
fn notify_account_update( fn notify_account_update(
&self, &self,
slot: Slot, slot: Slot,
meta: &StoredMeta,
account: &AccountSharedData, account: &AccountSharedData,
txn_signature: &Option<&Signature>, txn_signature: &Option<&Signature>,
pubkey: &Pubkey,
write_version: u64,
); );
/// Notified when the AccountsDb is initialized at start when restored /// Notified when the AccountsDb is initialized at start when restored