Geyser notification on account deletion by creating account notifcation with previous account states

This commit is contained in:
godmodegalactus 2024-04-18 13:09:04 +02:00
parent b9ff810894
commit 4876399a05
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
10 changed files with 314 additions and 128 deletions

View File

@ -974,7 +974,9 @@ impl Accounts {
.map(|(pubkey, account, _slot)| (*pubkey, account))
{
let src = bincode::serialize(&account).unwrap();
let compressed = lz4::block::compress(&src, Some(lz4::block::CompressionMode::FAST(3)), true).unwrap();
let compressed =
lz4::block::compress(&src, Some(lz4::block::CompressionMode::FAST(3)), true)
.unwrap();
collector.push((pk, compressed))
}
}
@ -1050,9 +1052,11 @@ impl Accounts {
ancestors,
bank_id,
|some_account_tuple| {
Self::load_while_filtering_compressed(&mut collector, some_account_tuple, |account| {
account.owner() == program_id && filter(account)
})
Self::load_while_filtering_compressed(
&mut collector,
some_account_tuple,
|account| account.owner() == program_id && filter(account),
)
},
config,
)
@ -1181,31 +1185,35 @@ impl Accounts {
bank_id,
*index_key,
|some_account_tuple| {
Self::load_while_filtering_compressed(&mut collector, some_account_tuple, |account| {
if just_get_program_ids {
if Self::accumulate_and_check_scan_result_size(
&sum,
account,
&byte_limit_for_scan,
) {
config.abort();
}
true
} else {
let use_account = filter(account);
if use_account
&& Self::accumulate_and_check_scan_result_size(
Self::load_while_filtering_compressed(
&mut collector,
some_account_tuple,
|account| {
if just_get_program_ids {
if Self::accumulate_and_check_scan_result_size(
&sum,
account,
&byte_limit_for_scan,
)
{
// total size of results exceeds size limit, so abort scan
config.abort();
) {
config.abort();
}
true
} else {
let use_account = filter(account);
if use_account
&& Self::accumulate_and_check_scan_result_size(
&sum,
account,
&byte_limit_for_scan,
)
{
// total size of results exceeds size limit, so abort scan
config.abort();
}
use_account
}
use_account
}
});
},
);
},
&config,
just_get_program_ids,
@ -1429,6 +1437,7 @@ impl Accounts {
durable_nonce: &DurableNonce,
lamports_per_signature: u64,
include_slot_in_hash: IncludeSlotInHash,
ancestors: Option<&Ancestors>,
) {
let (accounts_to_store, transactions) = self.collect_accounts_to_store(
txs,
@ -1441,6 +1450,7 @@ impl Accounts {
self.accounts_db.store_cached_inline_update_index(
(slot, &accounts_to_store[..], include_slot_in_hash),
Some(&transactions),
ancestors,
);
}
@ -1448,7 +1458,7 @@ impl Accounts {
&self,
accounts: impl StorableAccounts<'a, T>,
) {
self.accounts_db.store_cached(accounts, None)
self.accounts_db.store_cached(accounts, None, None)
}
/// Add a slot to root. Root slots cannot be purged

View File

@ -6792,6 +6792,7 @@ impl AccountsDb {
txn_iter: Box<dyn std::iter::Iterator<Item = &Option<&SanitizedTransaction>> + 'a>,
include_slot_in_hash: IncludeSlotInHash,
mut write_version_producer: P,
ancestors: Option<&Ancestors>,
) -> Vec<AccountInfo>
where
P: Iterator<Item = u64>,
@ -6803,19 +6804,22 @@ impl AccountsDb {
.account_default_if_zero_lamport(i)
.map(|account| account.to_account_shared_data())
.unwrap_or_default();
let pubkey = accounts_and_meta_to_store.pubkey(i);
let account_info = AccountInfo::new(StorageLocation::Cached, account.lamports());
self.notify_account_at_accounts_update(
slot,
&account,
txn,
accounts_and_meta_to_store.pubkey(i),
pubkey,
&mut write_version_producer,
ancestors,
);
let cached_account = self.accounts_cache.store(
slot,
accounts_and_meta_to_store.pubkey(i),
pubkey,
account,
None::<&Hash>,
include_slot_in_hash,
@ -6845,6 +6849,7 @@ impl AccountsDb {
mut write_version_producer: P,
store_to: &StoreTo,
transactions: Option<&[Option<&'a SanitizedTransaction>]>,
ancestors: Option<&Ancestors>,
) -> Vec<AccountInfo> {
let mut calc_stored_meta_time = Measure::start("calc_stored_meta");
let slot = accounts.target_slot();
@ -6874,6 +6879,7 @@ impl AccountsDb {
txn_iter,
accounts.include_slot_in_hash(),
write_version_producer,
ancestors,
)
}
StoreTo::Storage(storage) => {
@ -8500,6 +8506,7 @@ impl AccountsDb {
&self,
accounts: impl StorableAccounts<'a, T>,
transactions: Option<&'a [Option<&'a SanitizedTransaction>]>,
ancestors: Option<&Ancestors>,
) {
self.store(
accounts,
@ -8507,6 +8514,7 @@ impl AccountsDb {
transactions,
StoreReclaims::Default,
UpdateIndexThreadSelection::PoolWithThreshold,
ancestors,
);
}
@ -8517,6 +8525,7 @@ impl AccountsDb {
&self,
accounts: impl StorableAccounts<'a, T>,
transactions: Option<&'a [Option<&'a SanitizedTransaction>]>,
ancestors: Option<&Ancestors>,
) {
self.store(
accounts,
@ -8524,6 +8533,7 @@ impl AccountsDb {
transactions,
StoreReclaims::Default,
UpdateIndexThreadSelection::Inline,
ancestors,
);
}
@ -8537,6 +8547,7 @@ impl AccountsDb {
None,
StoreReclaims::Default,
UpdateIndexThreadSelection::PoolWithThreshold,
None,
);
}
@ -8547,6 +8558,7 @@ impl AccountsDb {
transactions: Option<&'a [Option<&'a SanitizedTransaction>]>,
reclaim: StoreReclaims,
update_index_thread_selection: UpdateIndexThreadSelection,
ancestors: Option<&Ancestors>,
) {
// If all transactions in a batch are errored,
// it's possible to get a store with no accounts.
@ -8584,6 +8596,7 @@ impl AccountsDb {
transactions,
reclaim,
update_index_thread_selection,
ancestors,
);
self.report_store_timings();
}
@ -8738,6 +8751,7 @@ impl AccountsDb {
transactions: Option<&'a [Option<&'a SanitizedTransaction>]>,
reclaim: StoreReclaims,
update_index_thread_selection: UpdateIndexThreadSelection,
ancestors: Option<&Ancestors>,
) {
// This path comes from a store to a non-frozen slot.
// If a store is dead here, then a newer update for
@ -8756,6 +8770,7 @@ impl AccountsDb {
transactions,
reclaim,
update_index_thread_selection,
ancestors,
);
}
@ -8780,9 +8795,11 @@ impl AccountsDb {
None,
reclaim,
UpdateIndexThreadSelection::PoolWithThreshold,
None,
)
}
#[allow(clippy::too_many_arguments)]
fn store_accounts_custom<'a, T: ReadableAccount + Sync + ZeroLamport + 'a>(
&self,
accounts: impl StorableAccounts<'a, T>,
@ -8793,6 +8810,7 @@ impl AccountsDb {
transactions: Option<&[Option<&SanitizedTransaction>]>,
reclaim: StoreReclaims,
update_index_thread_selection: UpdateIndexThreadSelection,
ancestors: Option<&Ancestors>,
) -> StoreAccountsTiming {
let write_version_producer: Box<dyn Iterator<Item = u64>> = write_version_producer
.unwrap_or_else(|| {
@ -8814,6 +8832,7 @@ impl AccountsDb {
write_version_producer,
store_to,
transactions,
ancestors,
);
store_accounts_time.stop();
self.stats
@ -9777,6 +9796,7 @@ impl AccountsDb {
None,
StoreReclaims::Default,
UpdateIndexThreadSelection::PoolWithThreshold,
None,
);
}
@ -11649,7 +11669,7 @@ pub mod tests {
let account0 = AccountSharedData::new(1, 0, &key);
let ancestors = vec![(unrooted_slot, 1)].into_iter().collect();
if is_cached {
db.store_cached((unrooted_slot, &[(&key, &account0)][..]), None);
db.store_cached((unrooted_slot, &[(&key, &account0)][..]), None, None);
} else {
db.store_for_tests(unrooted_slot, &[(&key, &account0)]);
}
@ -12800,6 +12820,7 @@ pub mod tests {
None,
StoreReclaims::Default,
UpdateIndexThreadSelection::PoolWithThreshold,
None,
);
db.add_root(some_slot);
let check_hash = true;
@ -13809,13 +13830,13 @@ pub mod tests {
let account = AccountSharedData::new(1, 16 * 4096, &Pubkey::default());
let pubkey1 = solana_sdk::pubkey::new_rand();
accounts.store_cached((0, &[(&pubkey1, &account)][..]), None);
accounts.store_cached((0, &[(&pubkey1, &account)][..]), None, None);
let pubkey2 = solana_sdk::pubkey::new_rand();
accounts.store_cached((0, &[(&pubkey2, &account)][..]), None);
accounts.store_cached((0, &[(&pubkey2, &account)][..]), None, None);
let zero_account = AccountSharedData::new(0, 1, &Pubkey::default());
accounts.store_cached((1, &[(&pubkey1, &zero_account)][..]), None);
accounts.store_cached((1, &[(&pubkey1, &zero_account)][..]), None, None);
// Add root 0 and flush separately
accounts.calculate_accounts_delta_hash(0);
@ -13856,7 +13877,7 @@ pub mod tests {
for i in 0..num_accounts {
let account = AccountSharedData::new((i + 1) as u64, size, &Pubkey::default());
let pubkey = solana_sdk::pubkey::new_rand();
accounts.store_cached((0 as Slot, &[(&pubkey, &account)][..]), None);
accounts.store_cached((0 as Slot, &[(&pubkey, &account)][..]), None, None);
keys.push(pubkey);
}
// get delta hash to feed these accounts to clean
@ -13870,7 +13891,7 @@ pub mod tests {
for (i, key) in keys[1..].iter().enumerate() {
let account =
AccountSharedData::new((1 + i + num_accounts) as u64, size, &Pubkey::default());
accounts.store_cached((1 as Slot, &[(key, &account)][..]), None);
accounts.store_cached((1 as Slot, &[(key, &account)][..]), None, None);
}
accounts.calculate_accounts_delta_hash(1);
accounts.add_root(1);
@ -13996,7 +14017,7 @@ pub mod tests {
let key = Pubkey::default();
let account0 = AccountSharedData::new(1, 0, &key);
let slot = 0;
db.store_cached((slot, &[(&key, &account0)][..]), None);
db.store_cached((slot, &[(&key, &account0)][..]), None, None);
// Load with no ancestors and no root will return nothing
assert!(db
@ -14028,7 +14049,7 @@ pub mod tests {
let key = Pubkey::default();
let account0 = AccountSharedData::new(1, 0, &key);
let slot = 0;
db.store_cached((slot, &[(&key, &account0)][..]), None);
db.store_cached((slot, &[(&key, &account0)][..]), None, None);
db.mark_slot_frozen(slot);
// No root was added yet, requires an ancestor to find
@ -14060,9 +14081,13 @@ pub mod tests {
let unrooted_key = solana_sdk::pubkey::new_rand();
let key5 = solana_sdk::pubkey::new_rand();
let key6 = solana_sdk::pubkey::new_rand();
db.store_cached((unrooted_slot, &[(&unrooted_key, &account0)][..]), None);
db.store_cached((root5, &[(&key5, &account0)][..]), None);
db.store_cached((root6, &[(&key6, &account0)][..]), None);
db.store_cached(
(unrooted_slot, &[(&unrooted_key, &account0)][..]),
None,
None,
);
db.store_cached((root5, &[(&key5, &account0)][..]), None, None);
db.store_cached((root6, &[(&key6, &account0)][..]), None, None);
for slot in &[unrooted_slot, root5, root6] {
db.mark_slot_frozen(*slot);
}
@ -14124,7 +14149,7 @@ pub mod tests {
let num_slots = 2 * max_cache_slots();
for i in 0..num_roots + num_unrooted {
let key = Pubkey::new_unique();
db.store_cached((i as Slot, &[(&key, &account0)][..]), None);
db.store_cached((i as Slot, &[(&key, &account0)][..]), None, None);
keys.push(key);
db.mark_slot_frozen(i as Slot);
if i < num_roots {
@ -14182,8 +14207,12 @@ pub mod tests {
let zero_lamport_account =
AccountSharedData::new(0, 0, AccountSharedData::default().owner());
let slot1_account = AccountSharedData::new(1, 1, AccountSharedData::default().owner());
db.store_cached((0, &[(&account_key, &zero_lamport_account)][..]), None);
db.store_cached((1, &[(&account_key, &slot1_account)][..]), None);
db.store_cached(
(0, &[(&account_key, &zero_lamport_account)][..]),
None,
None,
);
db.store_cached((1, &[(&account_key, &slot1_account)][..]), None, None);
db.add_root(0);
db.add_root(1);
@ -14238,10 +14267,10 @@ pub mod tests {
let account4_key = Pubkey::new_unique();
let account4 = AccountSharedData::new(0, 1, &owners[1]);
db.store_cached((0, &[(&account1_key, &account1)][..]), None);
db.store_cached((1, &[(&account2_key, &account2)][..]), None);
db.store_cached((2, &[(&account3_key, &account3)][..]), None);
db.store_cached((3, &[(&account4_key, &account4)][..]), None);
db.store_cached((0, &[(&account1_key, &account1)][..]), None, None);
db.store_cached((1, &[(&account2_key, &account2)][..]), None, None);
db.store_cached((2, &[(&account3_key, &account3)][..]), None, None);
db.store_cached((3, &[(&account4_key, &account4)][..]), None, None);
db.add_root(0);
db.add_root(1);
@ -14324,8 +14353,12 @@ pub mod tests {
let zero_lamport_account =
AccountSharedData::new(0, 0, AccountSharedData::default().owner());
let slot1_account = AccountSharedData::new(1, 1, AccountSharedData::default().owner());
db.store_cached((0, &[(&account_key, &zero_lamport_account)][..]), None);
db.store_cached((1, &[(&account_key, &slot1_account)][..]), None);
db.store_cached(
(0, &[(&account_key, &zero_lamport_account)][..]),
None,
None,
);
db.store_cached((1, &[(&account_key, &slot1_account)][..]), None, None);
db.add_root(0);
db.add_root(1);
@ -14382,10 +14415,11 @@ pub mod tests {
db.store_cached(
(0, &[(&zero_lamport_account_key, &slot0_account)][..]),
None,
None,
);
// Second key keeps other lamport account entry for slot 0 alive,
// preventing clean of the zero_lamport_account in slot 1.
db.store_cached((0, &[(&other_account_key, &slot0_account)][..]), None);
db.store_cached((0, &[(&other_account_key, &slot0_account)][..]), None, None);
db.add_root(0);
db.flush_accounts_cache(true, None);
assert!(db.storage.get_slot_storage_entry(0).is_some());
@ -14394,6 +14428,7 @@ pub mod tests {
db.store_cached(
(1, &[(&zero_lamport_account_key, &zero_lamport_account)][..]),
None,
None,
);
// Store into slot 2, which makes all updates from slot 1 outdated.
@ -14405,6 +14440,7 @@ pub mod tests {
db.store_cached(
(2, &[(&zero_lamport_account_key, &zero_lamport_account)][..]),
None,
None,
);
db.add_root(1);
db.add_root(2);
@ -14526,11 +14562,15 @@ pub mod tests {
/ \
1 2 (root)
*/
db.store_cached((0, &[(&account_key, &zero_lamport_account)][..]), None);
db.store_cached((1, &[(&account_key, &slot1_account)][..]), None);
db.store_cached(
(0, &[(&account_key, &zero_lamport_account)][..]),
None,
None,
);
db.store_cached((1, &[(&account_key, &slot1_account)][..]), None, None);
// Fodder for the scan so that the lock on `account_key` is not held
db.store_cached((1, &[(&account_key2, &slot1_account)][..]), None);
db.store_cached((2, &[(&account_key, &slot2_account)][..]), None);
db.store_cached((1, &[(&account_key2, &slot1_account)][..]), None, None);
db.store_cached((2, &[(&account_key, &slot2_account)][..]), None, None);
db.calculate_accounts_delta_hash(0);
let max_scan_root = 0;
@ -14632,7 +14672,7 @@ pub mod tests {
for data_size in 0..num_keys {
let account = AccountSharedData::new(1, data_size, &Pubkey::default());
accounts_db.store_cached((slot, &[(&Pubkey::new_unique(), &account)][..]), None);
accounts_db.store_cached((slot, &[(&Pubkey::new_unique(), &account)][..]), None, None);
}
accounts_db.add_root(slot);
@ -14691,6 +14731,7 @@ pub mod tests {
)][..],
),
None,
None,
);
}
@ -14705,6 +14746,7 @@ pub mod tests {
&[(key, &AccountSharedData::new(1, space, &Pubkey::default()))][..],
),
None,
None,
);
}
accounts_db.add_root(*slot as Slot);
@ -15084,8 +15126,8 @@ pub mod tests {
let account1 = AccountSharedData::new(1, 0, AccountSharedData::default().owner());
// Store into slot 0
db.store_cached((0, &[(&account_key1, &account1)][..]), None);
db.store_cached((0, &[(&account_key2, &account1)][..]), None);
db.store_cached((0, &[(&account_key1, &account1)][..]), None, None);
db.store_cached((0, &[(&account_key2, &account1)][..]), None, None);
db.add_root(0);
if !do_intra_cache_clean {
// If we don't want the cache doing purges before flush,
@ -15117,7 +15159,7 @@ pub mod tests {
db.shrink_candidate_slots(&epoch_schedule);
// Make slot 0 dead by updating the remaining key
db.store_cached((2, &[(&account_key2, &account1)][..]), None);
db.store_cached((2, &[(&account_key2, &account1)][..]), None, None);
db.add_root(2);
// Flushes all roots
@ -15356,6 +15398,7 @@ pub mod tests {
)][..],
),
None,
None,
);
db.add_root(0);
db.flush_accounts_cache(true, None);
@ -15374,7 +15417,7 @@ pub mod tests {
return;
}
account.set_lamports(slot + 1);
db.store_cached((slot, &[(pubkey.as_ref(), &account)][..]), None);
db.store_cached((slot, &[(pubkey.as_ref(), &account)][..]), None, None);
db.add_root(slot);
sleep(Duration::from_millis(RACY_SLEEP_MS));
db.flush_accounts_cache(true, None);
@ -15530,7 +15573,7 @@ pub mod tests {
let num_trials = 10;
for _ in 0..num_trials {
let pubkey = Pubkey::new_unique();
db.store_cached((slot, &[(&pubkey, &account)][..]), None);
db.store_cached((slot, &[(&pubkey, &account)][..]), None, None);
// Wait for both threads to finish
flush_trial_start_sender.send(()).unwrap();
remove_trial_start_sender.send(()).unwrap();
@ -15619,7 +15662,7 @@ pub mod tests {
let slot_to_pubkey_map: HashMap<Slot, Pubkey> = (0..num_cached_slots)
.map(|slot| {
let pubkey = Pubkey::new_unique();
db.store_cached((slot, &[(&pubkey, &account)][..]), None);
db.store_cached((slot, &[(&pubkey, &account)][..]), None, None);
(slot, pubkey)
})
.collect();
@ -16080,19 +16123,19 @@ pub mod tests {
let slot1: Slot = 1;
let account = AccountSharedData::new(111, space, &owner);
accounts_db.store_cached((slot1, &[(&pubkey, &account)][..]), None);
accounts_db.store_cached((slot1, &[(&pubkey, &account)][..]), None, None);
accounts_db.calculate_accounts_delta_hash(slot1);
accounts_db.add_root_and_flush_write_cache(slot1);
let slot2: Slot = 2;
let account = AccountSharedData::new(222, space, &owner);
accounts_db.store_cached((slot2, &[(&pubkey, &account)][..]), None);
accounts_db.store_cached((slot2, &[(&pubkey, &account)][..]), None, None);
accounts_db.calculate_accounts_delta_hash(slot2);
accounts_db.add_root_and_flush_write_cache(slot2);
let slot3: Slot = 3;
let account = AccountSharedData::new(0, space, &owner);
accounts_db.store_cached((slot3, &[(&pubkey, &account)][..]), None);
accounts_db.store_cached((slot3, &[(&pubkey, &account)][..]), None, None);
accounts_db.calculate_accounts_delta_hash(slot3);
accounts_db.add_root_and_flush_write_cache(slot3);
@ -18152,7 +18195,7 @@ pub mod tests {
(&accounts[2].0, &accounts[2].1),
(&accounts[3].0, &accounts[3].1),
];
accounts_db.store_cached((slot, accounts.as_slice()), None);
accounts_db.store_cached((slot, accounts.as_slice()), None, None);
accounts_db.add_root_and_flush_write_cache(slot);
}
@ -18169,7 +18212,7 @@ pub mod tests {
(&accounts[1].0, &accounts[1].1),
(&accounts[4].0, &accounts[4].1),
];
accounts_db.store_cached((slot, accounts.as_slice()), None);
accounts_db.store_cached((slot, accounts.as_slice()), None, None);
accounts_db.add_root_and_flush_write_cache(slot);
}
@ -18215,7 +18258,7 @@ pub mod tests {
(&accounts[5].0, &accounts[5].1),
(&accounts[6].0, &accounts[6].1),
];
accounts_db.store_cached((slot, accounts.as_slice()), None);
accounts_db.store_cached((slot, accounts.as_slice()), None, None);
accounts_db.add_root_and_flush_write_cache(slot);
}
@ -18236,7 +18279,7 @@ pub mod tests {
(&accounts[5].0, &accounts[5].1),
(&accounts[7].0, &accounts[7].1),
];
accounts_db.store_cached((slot, accounts.as_slice()), None);
accounts_db.store_cached((slot, accounts.as_slice()), None, None);
accounts_db.add_root_and_flush_write_cache(slot);
}

View File

@ -2,11 +2,17 @@ use {
crate::{
account_storage::meta::{StoredAccountMeta, StoredMeta},
accounts_db::AccountsDb,
accounts_index::AccountIndexGetResult,
ancestors::Ancestors,
},
solana_measure::measure::Measure,
solana_metrics::*,
solana_sdk::{
account::AccountSharedData, clock::Slot, pubkey::Pubkey, transaction::SanitizedTransaction,
account::{AccountSharedData, ReadableAccount},
clock::Slot,
pubkey::Pubkey,
system_program,
transaction::SanitizedTransaction,
},
std::collections::{HashMap, HashSet},
};
@ -68,10 +74,27 @@ impl AccountsDb {
txn: &Option<&SanitizedTransaction>,
pubkey: &Pubkey,
write_version_producer: &mut P,
ancestors: Option<&Ancestors>,
) where
P: Iterator<Item = u64>,
{
if let Some(accounts_update_notifier) = &self.accounts_update_notifier {
let loaded_account = if let AccountIndexGetResult::Found(lock, index) =
self.accounts_index.get(pubkey, ancestors, Some(slot))
{
let (slot, account_info) = &lock.slot_list()[index];
self.get_account_accessor(*slot, pubkey, &account_info.storage_location())
.get_loaded_account()
.map(|loaded_account| loaded_account.take_account())
} else {
None
};
if let Some(loaded_account) = &loaded_account {
if *loaded_account.owner() != system_program::id() {
println!("{:?} {:?}", pubkey, loaded_account.owner());
}
}
let notifier = &accounts_update_notifier.read().unwrap();
notifier.notify_account_update(
slot,
@ -79,6 +102,7 @@ impl AccountsDb {
txn,
pubkey,
write_version_producer.next().unwrap(),
loaded_account.as_ref(),
);
}
}
@ -191,6 +215,7 @@ pub mod tests {
struct GeyserTestPlugin {
pub accounts_notified: DashMap<Pubkey, Vec<(Slot, AccountSharedData)>>,
pub is_startup_done: AtomicBool,
pub old_account_data: DashMap<Pubkey, Vec<(Slot, Option<AccountSharedData>)>>,
}
impl AccountsUpdateNotifierInterface for GeyserTestPlugin {
@ -202,11 +227,16 @@ pub mod tests {
_txn: &Option<&SanitizedTransaction>,
pubkey: &Pubkey,
_write_version: u64,
old_account_data: Option<&AccountSharedData>,
) {
self.accounts_notified
.entry(*pubkey)
.or_default()
.push((slot, account.clone()));
self.old_account_data
.entry(*pubkey)
.or_default()
.push((slot, old_account_data.cloned()));
}
/// Notified when the AccountsDb is initialized at start when restored
@ -353,57 +383,87 @@ pub mod tests {
let account1 =
AccountSharedData::new(account1_lamports1, 1, AccountSharedData::default().owner());
let slot0 = 0;
accounts.store_cached((slot0, &[(&key1, &account1)][..]), None);
accounts.store_cached((slot0, &[(&key1, &account1)][..]), None, None);
let key2 = solana_sdk::pubkey::new_rand();
let account2_lamports: u64 = 200;
let account2 =
AccountSharedData::new(account2_lamports, 1, AccountSharedData::default().owner());
accounts.store_cached((slot0, &[(&key2, &account2)][..]), None);
accounts.store_cached((slot0, &[(&key2, &account2)][..]), None, None);
let account1_lamports2 = 2;
let slot1 = 1;
let account1 = AccountSharedData::new(account1_lamports2, 1, account1.owner());
accounts.store_cached((slot1, &[(&key1, &account1)][..]), None);
accounts.store_cached((slot1, &[(&key1, &account1)][..]), None, None);
let account3_owner = Pubkey::new_unique();
let key3 = solana_sdk::pubkey::new_rand();
let account3_lamports: u64 = 300;
let account3 =
AccountSharedData::new(account3_lamports, 1, AccountSharedData::default().owner());
accounts.store_cached((slot1, &[(&key3, &account3)][..]), None);
let account3 = AccountSharedData::new(account3_lamports, 1, &account3_owner);
accounts.store_cached((slot1, &[(&key3, &account3)][..]), None, None);
let notifier = notifier.write().unwrap();
assert_eq!(notifier.accounts_notified.get(&key1).unwrap().len(), 2);
assert_eq!(
notifier.accounts_notified.get(&key1).unwrap()[0]
.1
.lamports(),
account1_lamports1
);
assert_eq!(notifier.accounts_notified.get(&key1).unwrap()[0].0, slot0);
assert_eq!(
notifier.accounts_notified.get(&key1).unwrap()[1]
.1
.lamports(),
account1_lamports2
);
assert_eq!(notifier.accounts_notified.get(&key1).unwrap()[1].0, slot1);
// delete account 3
let account3_lamports: u64 = 0;
let account3 = AccountSharedData::new(account3_lamports, 1, &account3_owner);
accounts.store_cached((slot1, &[(&key3, &account3)][..]), None, None);
assert_eq!(notifier.accounts_notified.get(&key2).unwrap().len(), 1);
assert_eq!(
notifier.accounts_notified.get(&key2).unwrap()[0]
{
let notifier = notifier.write().unwrap();
assert_eq!(notifier.accounts_notified.get(&key1).unwrap().len(), 2);
assert_eq!(
notifier.accounts_notified.get(&key1).unwrap()[0]
.1
.lamports(),
account1_lamports1
);
assert_eq!(notifier.accounts_notified.get(&key1).unwrap()[0].0, slot0);
assert_eq!(
notifier.accounts_notified.get(&key1).unwrap()[1]
.1
.lamports(),
account1_lamports2
);
assert_eq!(notifier.accounts_notified.get(&key1).unwrap()[1].0, slot1);
assert_eq!(notifier.accounts_notified.get(&key2).unwrap().len(), 1);
assert_eq!(
notifier.accounts_notified.get(&key2).unwrap()[0]
.1
.lamports(),
account2_lamports
);
assert_eq!(notifier.accounts_notified.get(&key2).unwrap()[0].0, slot0);
assert_eq!(notifier.accounts_notified.get(&key3).unwrap().len(), 2);
assert_eq!(
notifier.accounts_notified.get(&key3).unwrap()[0]
.1
.lamports(),
300
);
assert_eq!(
notifier.accounts_notified.get(&key3).unwrap()[0].1.owner(),
&account3_owner
);
assert_eq!(notifier.accounts_notified.get(&key3).unwrap()[0].0, slot1);
assert_eq!(notifier.accounts_notified.get(&key3).unwrap()[1].0, slot1);
assert_eq!(
notifier.accounts_notified.get(&key3).unwrap()[1]
.1
.lamports(),
0
);
assert_eq!(
notifier.accounts_notified.get(&key3).unwrap()[1].1.owner(),
&Pubkey::default()
);
assert!(notifier.old_account_data.get(&key3).unwrap()[1].1.is_some());
let old_account = notifier.old_account_data.get(&key3).unwrap()[1]
.1
.lamports(),
account2_lamports
);
assert_eq!(notifier.accounts_notified.get(&key2).unwrap()[0].0, slot0);
assert_eq!(notifier.accounts_notified.get(&key3).unwrap().len(), 1);
assert_eq!(
notifier.accounts_notified.get(&key3).unwrap()[0]
.1
.lamports(),
account3_lamports
);
assert_eq!(notifier.accounts_notified.get(&key3).unwrap()[0].0, slot1);
.clone()
.unwrap();
assert_eq!(old_account.lamports(), 300);
assert_eq!(old_account.owner(), &account3_owner);
}
}
}

View File

@ -15,6 +15,7 @@ pub trait AccountsUpdateNotifierInterface: std::fmt::Debug {
txn: &Option<&SanitizedTransaction>,
pubkey: &Pubkey,
write_version: u64,
old_account_data: Option<&AccountSharedData>,
);
/// Notified when the AccountsDb is initialized at start when restored

View File

@ -108,6 +108,60 @@ pub struct ReplicaAccountInfoV3<'a> {
pub txn: Option<&'a SanitizedTransaction>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ReplicaOldAccountInfo<'a> {
/// The lamports for the account
pub lamports: u64,
/// The Pubkey of the owner program account
pub owner: &'a [u8],
/// This account's data contains a loaded program (and is now read-only)
pub executable: bool,
/// The epoch at which this account will next owe rent
pub rent_epoch: u64,
/// The data held in this account.
pub data: &'a [u8],
}
#[derive(Debug, Clone, PartialEq, Eq)]
/// Information about an account being updated
/// (extended with reference to transaction doing this update)
pub struct ReplicaAccountInfoV4<'a> {
/// The Pubkey for the account
pub pubkey: &'a [u8],
/// The lamports for the account
pub lamports: u64,
/// The Pubkey of the owner program account
pub owner: &'a [u8],
/// This account's data contains a loaded program (and is now read-only)
pub executable: bool,
/// The epoch at which this account will next owe rent
pub rent_epoch: u64,
/// The data held in this account.
pub data: &'a [u8],
/// A global monotonically increasing atomic number, which can be used
/// to tell the order of the account update. For example, when an
/// account is updated in the same slot multiple times, the update
/// with higher write_version should supersede the one with lower
/// write_version.
pub write_version: u64,
/// Reference to transaction causing this account modification
pub txn: Option<&'a SanitizedTransaction>,
/// old account data
pub old_account_data: Option<ReplicaOldAccountInfo<'a>>,
}
/// A wrapper to future-proof ReplicaAccountInfo handling.
/// If there were a change to the structure of ReplicaAccountInfo,
/// there would be new enum entry for the newer version, forcing
@ -116,6 +170,7 @@ pub enum ReplicaAccountInfoVersions<'a> {
V0_0_1(&'a ReplicaAccountInfo<'a>),
V0_0_2(&'a ReplicaAccountInfoV2<'a>),
V0_0_3(&'a ReplicaAccountInfoV3<'a>),
V0_0_4(&'a ReplicaAccountInfoV4<'a>),
}
/// Information about a transaction

View File

@ -2,7 +2,7 @@
use {
crate::geyser_plugin_manager::GeyserPluginManager,
agave_geyser_plugin_interface::geyser_plugin_interface::{
ReplicaAccountInfoV3, ReplicaAccountInfoVersions,
ReplicaAccountInfoV4, ReplicaAccountInfoVersions, ReplicaOldAccountInfo,
},
log::*,
solana_accounts_db::{
@ -33,12 +33,16 @@ impl AccountsUpdateNotifierInterface for AccountsUpdateNotifierImpl {
txn: &Option<&SanitizedTransaction>,
pubkey: &Pubkey,
write_version: u64,
old_account: Option<&AccountSharedData>,
) {
if let Some(account_info) =
self.accountinfo_from_shared_account_data(account, txn, pubkey, write_version)
{
self.notify_plugins_of_account_update(account_info, slot, false);
}
let account_info = self.accountinfo_from_shared_account_data(
account,
txn,
pubkey,
write_version,
old_account,
);
self.notify_plugins_of_account_update(account_info, slot, false);
}
fn notify_account_restore_from_snapshot(&self, slot: Slot, account: &StoredAccountMeta) {
@ -120,8 +124,9 @@ impl AccountsUpdateNotifierImpl {
txn: &'a Option<&'a SanitizedTransaction>,
pubkey: &'a Pubkey,
write_version: u64,
) -> Option<ReplicaAccountInfoV3<'a>> {
Some(ReplicaAccountInfoV3 {
old_account: Option<&'a AccountSharedData>,
) -> ReplicaAccountInfoV4<'a> {
ReplicaAccountInfoV4 {
pubkey: pubkey.as_ref(),
lamports: account.lamports(),
owner: account.owner().as_ref(),
@ -130,14 +135,21 @@ impl AccountsUpdateNotifierImpl {
data: account.data(),
write_version,
txn: *txn,
})
old_account_data: old_account.map(|account| ReplicaOldAccountInfo {
data: account.data(),
executable: account.executable(),
lamports: account.lamports(),
owner: account.owner().as_ref(),
rent_epoch: account.rent_epoch(),
}),
}
}
fn accountinfo_from_stored_account_meta<'a>(
&self,
stored_account_meta: &'a StoredAccountMeta,
) -> Option<ReplicaAccountInfoV3<'a>> {
Some(ReplicaAccountInfoV3 {
) -> Option<ReplicaAccountInfoV4<'a>> {
Some(ReplicaAccountInfoV4 {
pubkey: stored_account_meta.pubkey().as_ref(),
lamports: stored_account_meta.lamports(),
owner: stored_account_meta.owner().as_ref(),
@ -146,12 +158,13 @@ impl AccountsUpdateNotifierImpl {
data: stored_account_meta.data(),
write_version: stored_account_meta.write_version(),
txn: None,
old_account_data: None,
})
}
fn notify_plugins_of_account_update(
&self,
account: ReplicaAccountInfoV3,
account: ReplicaAccountInfoV4,
slot: Slot,
is_startup: bool,
) {
@ -164,7 +177,7 @@ impl AccountsUpdateNotifierImpl {
for plugin in plugin_manager.plugins.iter() {
let mut measure = Measure::start("geyser-plugin-update-account");
match plugin.update_account(
ReplicaAccountInfoVersions::V0_0_3(&account),
ReplicaAccountInfoVersions::V0_0_4(&account),
slot,
is_startup,
) {

View File

@ -108,7 +108,6 @@ impl GeyserPluginManager {
}
false
}
/// Admin RPC request handler
pub(crate) fn list_plugins(&self) -> JsonRpcResult<Vec<String>> {
Ok(self.plugins.iter().map(|p| p.name().to_owned()).collect())

View File

@ -552,9 +552,13 @@ impl JsonRpcRequestProcessor {
optimize_filters(&mut filters);
let keyed_accounts = {
if let Some(owner) = get_spl_token_owner_filter(program_id, &filters) {
self.get_filtered_spl_token_accounts_by_owner_compressed(&bank, program_id, &owner, filters)?
self.get_filtered_spl_token_accounts_by_owner_compressed(
&bank, program_id, &owner, filters,
)?
} else if let Some(mint) = get_spl_token_mint_filter(program_id, &filters) {
self.get_filtered_spl_token_accounts_by_mint_compressed(&bank, program_id, &mint, filters)?
self.get_filtered_spl_token_accounts_by_mint_compressed(
&bank, program_id, &mint, filters,
)?
} else {
self.get_filtered_program_accounts_compressed(
&bank,
@ -568,11 +572,9 @@ impl JsonRpcRequestProcessor {
let compressed_list: Vec<RpcKeyedCompressedAccount> = keyed_accounts
.iter()
.map(|(pubkey, account)| {
RpcKeyedCompressedAccount {
p: pubkey.to_string(),
a: BASE64_STANDARD.encode(account),
}
.map(|(pubkey, account)| RpcKeyedCompressedAccount {
p: pubkey.to_string(),
a: BASE64_STANDARD.encode(account),
})
.collect_vec();

View File

@ -36,7 +36,6 @@
#[allow(deprecated)]
use solana_sdk::recent_blockhashes_account;
pub use solana_sdk::reward_type::RewardType;
use solana_sdk::transaction_context::TransactionAccountCompressed;
use {
crate::{
bank::metrics::*,
@ -172,7 +171,8 @@ use {
TransactionVerificationMode, VersionedTransaction, MAX_TX_ACCOUNT_LOCKS,
},
transaction_context::{
ExecutionRecord, TransactionAccount, TransactionContext, TransactionReturnData,
ExecutionRecord, TransactionAccount, TransactionAccountCompressed, TransactionContext,
TransactionReturnData,
},
},
solana_stake_program::stake_state::{
@ -5665,6 +5665,7 @@ impl Bank {
&durable_nonce,
lamports_per_signature,
self.include_slot_in_hash(),
Some(&self.ancestors),
);
let rent_debits = self.collect_rent(&execution_results, loaded_txs);

View File

@ -67,6 +67,7 @@ fn test_shrink_and_clean() {
INCLUDE_SLOT_IN_HASH_TESTS,
),
None,
None,
);
}
accounts.add_root(current_slot);
@ -136,6 +137,7 @@ fn test_bad_bank_hash() {
db.store_cached(
(some_slot, &account_refs[..], INCLUDE_SLOT_IN_HASH_TESTS),
None,
None,
);
for pass in 0..2 {
for (key, account) in &account_refs {