reverting account store to use Pubkey rather than partial key

This commit is contained in:
godmodegalactus 2024-07-13 14:32:45 +02:00
parent 3d602ef6ab
commit 7fe6a390d8
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
2 changed files with 64 additions and 109 deletions

View File

@ -12,7 +12,6 @@ use lite_account_manager_common::{
account_filters_interface::AccountFiltersStoreInterface, account_filters_interface::AccountFiltersStoreInterface,
account_store_interface::{AccountLoadingError, AccountStorageInterface}, account_store_interface::{AccountLoadingError, AccountStorageInterface},
commitment::Commitment, commitment::Commitment,
pubkey_container_utils::PartialPubkey,
slot_info::SlotInfo, slot_info::SlotInfo,
}; };
use prometheus::{opts, register_int_gauge, IntGauge}; use prometheus::{opts, register_int_gauge, IntGauge};
@ -38,7 +37,7 @@ struct SlotStatus {
} }
pub struct InmemoryAccountStore { pub struct InmemoryAccountStore {
account_store: Arc<DashMap<PartialPubkey<8>, Vec<AccountDataByCommitment>>>, account_store: Arc<DashMap<Pubkey, AccountDataByCommitment>>,
accounts_by_owner: Arc<DashMap<Pubkey, HashSet<Pubkey>>>, accounts_by_owner: Arc<DashMap<Pubkey, HashSet<Pubkey>>>,
slots_status: Arc<Mutex<BTreeMap<Slot, SlotStatus>>>, slots_status: Arc<Mutex<BTreeMap<Slot, SlotStatus>>>,
filtered_accounts: Arc<dyn AccountFiltersStoreInterface>, filtered_accounts: Arc<dyn AccountFiltersStoreInterface>,
@ -147,12 +146,7 @@ impl InmemoryAccountStore {
} }
pub fn account_store_contains_key(&self, pubkey: &Pubkey) -> bool { pub fn account_store_contains_key(&self, pubkey: &Pubkey) -> bool {
match self.account_store.entry(pubkey.into()) { self.account_store.contains_key(pubkey)
dashmap::mapref::entry::Entry::Occupied(occ) => {
occ.get().iter().any(|x| x.pubkey == *pubkey)
}
dashmap::mapref::entry::Entry::Vacant(_) => false,
}
} }
} }
@ -174,43 +168,38 @@ impl AccountStorageInterface for InmemoryAccountStore {
.maybe_update_slot_status(&account_data, commitment) .maybe_update_slot_status(&account_data, commitment)
.await; .await;
match self.account_store.entry(account_data.pubkey.into()) { match self.account_store.entry(account_data.pubkey) {
dashmap::mapref::entry::Entry::Occupied(mut occ) => { dashmap::mapref::entry::Entry::Occupied(mut occ) => {
let account_datas = occ.get_mut(); let account_data_in_store = occ.get_mut();
for account_data_in_store in account_datas { let prev_account = account_data_in_store.get_account_data(commitment);
if account_data.pubkey == account_data_in_store.pubkey { // if account has been updated
let prev_account = account_data_in_store.get_account_data(commitment); if account_data_in_store.update(account_data.clone(), commitment) {
// if account has been updated if let Some(prev_account) = prev_account {
if account_data_in_store.update(account_data.clone(), commitment) { if self
if let Some(prev_account) = prev_account { .update_owner_delete_if_necessary(
if self &prev_account,
.update_owner_delete_if_necessary( &account_data,
&prev_account, commitment,
&account_data, )
commitment, .await
) {
.await occ.remove_entry();
{
occ.remove_entry();
}
}
return true;
} else {
return false;
} }
} }
true
} else {
false
} }
false
} }
dashmap::mapref::entry::Entry::Vacant(vac) => { dashmap::mapref::entry::Entry::Vacant(vac) => {
if self.satisfies_filters(&account_data).await { if self.satisfies_filters(&account_data).await {
ACCOUNT_STORED_IN_MEMORY.inc(); ACCOUNT_STORED_IN_MEMORY.inc();
self.add_account_owner(account_data.pubkey, account_data.account.owner); self.add_account_owner(account_data.pubkey, account_data.account.owner);
vac.insert(vec![AccountDataByCommitment::new( vac.insert(AccountDataByCommitment::new(
account_data.clone(), account_data.clone(),
commitment, commitment,
)]); ));
true true
} else { } else {
false false
@ -229,24 +218,19 @@ impl AccountStorageInterface for InmemoryAccountStore {
account_pk: Pubkey, account_pk: Pubkey,
commitment: Commitment, commitment: Commitment,
) -> Result<Option<AccountData>, AccountLoadingError> { ) -> Result<Option<AccountData>, AccountLoadingError> {
match self.account_store.entry(account_pk.into()) { match self.account_store.entry(account_pk) {
dashmap::mapref::entry::Entry::Occupied(occ) => { dashmap::mapref::entry::Entry::Occupied(occ) => {
let accs = occ.get(); let acc = occ.get();
for acc in accs { let acc = acc.get_account_data(commitment);
if acc.pubkey == account_pk { if let Some(account) = &acc {
let acc = acc.get_account_data(commitment); if account.account.lamports > 0 {
if let Some(account) = &acc { Ok(acc)
if account.account.lamports > 0 { } else {
return Ok(acc); Ok(None)
} else {
return Ok(None);
}
} else {
return Ok(None);
}
} }
} else {
Ok(None)
} }
Ok(None)
} }
dashmap::mapref::entry::Entry::Vacant(_) => Ok(None), dashmap::mapref::entry::Entry::Vacant(_) => Ok(None),
} }
@ -380,55 +364,34 @@ impl AccountStorageInterface for InmemoryAccountStore {
let mut updated_accounts = vec![]; let mut updated_accounts = vec![];
for (writable_account, update_slot) in writable_accounts { for (writable_account, update_slot) in writable_accounts {
match self.account_store.entry(writable_account.into()) { match self.account_store.entry(writable_account) {
dashmap::mapref::entry::Entry::Occupied(mut occ) => { dashmap::mapref::entry::Entry::Occupied(mut occ) => {
let mut do_delete = false; if let Some((account_data, prev_account_data)) = occ
for acc in occ.get_mut() { .get_mut()
if acc.pubkey == writable_account { .promote_slot_commitment(writable_account, update_slot, commitment)
if let Some((account_data, prev_account_data)) = acc {
.promote_slot_commitment(writable_account, update_slot, commitment) if let Some(prev_account_data) = prev_account_data {
// check if owner has changed
if (prev_account_data.account.owner != account_data.account.owner
|| account_data.account.lamports == 0)
&& self
.update_owner_delete_if_necessary(
&prev_account_data,
&account_data,
commitment,
)
.await
{ {
if let Some(prev_account_data) = prev_account_data { occ.remove();
// check if owner has changed
if (prev_account_data.account.owner
!= account_data.account.owner
|| account_data.account.lamports == 0)
&& self
.update_owner_delete_if_necessary(
&prev_account_data,
&account_data,
commitment,
)
.await
{
do_delete = true;
}
//check if account data has changed
if prev_account_data != account_data {
updated_accounts.push(account_data);
}
} else {
// account has been confirmed first time
updated_accounts.push(account_data);
}
} }
}
}
if do_delete { //check if account data has changed
// only has one element i.e it is element that we want to remove if prev_account_data != account_data {
if occ.get().len() == 1 { updated_accounts.push(account_data);
occ.remove_entry(); }
} else { } else {
let index = occ // account has been confirmed first time
.get() updated_accounts.push(account_data);
.iter()
.enumerate()
.find(|x| x.1.pubkey == writable_account)
.map(|(index, _)| index)
.unwrap();
occ.get_mut().swap_remove(index);
} }
} }
} }
@ -764,11 +727,10 @@ mod tests {
assert_eq!( assert_eq!(
store store
.account_store .account_store
.get(&pk1.into()) .get(&pk1)
.unwrap() .unwrap()
.iter() .processed_accounts
.map(|x| x.processed_accounts.len()) .len(),
.sum::<usize>(),
12 12
); );
store store
@ -777,11 +739,10 @@ mod tests {
assert_eq!( assert_eq!(
store store
.account_store .account_store
.get(&pk1.into()) .get(&pk1)
.unwrap() .unwrap()
.iter() .processed_accounts
.map(|x| x.processed_accounts.len()) .len(),
.sum::<usize>(),
1 1
); );

View File

@ -1,22 +1,16 @@
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)] #[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
pub struct PartialPubkey<const SIZE: usize> { pub struct PartialPubkey<const SIZE: usize>([u8; SIZE]);
partial_pubkey_bytes: [u8; SIZE],
}
impl<const SIZE: usize> From<Pubkey> for PartialPubkey<SIZE> { impl<const SIZE: usize> From<Pubkey> for PartialPubkey<SIZE> {
fn from(value: Pubkey) -> Self { fn from(value: Pubkey) -> Self {
Self { Self(value.to_bytes()[0..SIZE].try_into().unwrap())
partial_pubkey_bytes: value.to_bytes()[0..SIZE].try_into().unwrap(),
}
} }
} }
impl<const SIZE: usize> From<&Pubkey> for PartialPubkey<SIZE> { impl<const SIZE: usize> From<&Pubkey> for PartialPubkey<SIZE> {
fn from(value: &Pubkey) -> Self { fn from(value: &Pubkey) -> Self {
Self { Self(value.to_bytes()[0..SIZE].try_into().unwrap())
partial_pubkey_bytes: value.to_bytes()[0..SIZE].try_into().unwrap(),
}
} }
} }