making account memory store more optimized

This commit is contained in:
Godmode Galactus 2024-11-23 21:47:57 +01:00
parent 5e36199f17
commit cbda12079e
No known key found for this signature in database
GPG Key ID: A6B75566742EA987
2 changed files with 120 additions and 90 deletions

View File

@ -266,4 +266,10 @@ impl AccountDataByCommitment {
None
}
}
pub fn delete(&mut self) {
self.processed_accounts.clear();
self.confirmed_account = None;
self.finalized_account = None;
}
}

View File

@ -31,35 +31,38 @@ lazy_static::lazy_static! {
struct SlotStatus {
pub commitment: Commitment,
pub accounts_updated: HashSet<Pubkey>,
pub accounts_updated: HashSet<AccountIndex>,
pub parent: Option<Slot>,
}
type AccountIndex = usize;
pub struct InmemoryAccountStore {
account_store: Arc<DashMap<Pubkey, AccountDataByCommitment>>,
accounts_by_owner: Arc<DashMap<Pubkey, Arc<RwLock<HashSet<Pubkey>>>>>,
pubkey_to_account_index: Arc<DashMap<Pubkey, AccountIndex>>,
accounts_by_owner: Arc<DashMap<Pubkey, Arc<RwLock<HashSet<AccountIndex>>>>>,
slots_status: Arc<Mutex<BTreeMap<Slot, SlotStatus>>>,
filtered_accounts: Arc<dyn AccountFiltersStoreInterface>,
accounts_store: RwLock<Vec<RwLock<AccountDataByCommitment>>>,
}
impl InmemoryAccountStore {
pub fn new(filtered_accounts: Arc<dyn AccountFiltersStoreInterface>) -> Self {
Self {
account_store: Arc::new(DashMap::new()),
pubkey_to_account_index: Arc::new(DashMap::new()),
accounts_by_owner: Arc::new(DashMap::new()),
slots_status: Arc::new(Mutex::new(BTreeMap::new())),
filtered_accounts,
accounts_store: RwLock::new(vec![]),
}
}
fn add_account_owner(&self, account: Pubkey, owner: Pubkey) {
fn add_account_owner(&self, account_index: AccountIndex, owner: Pubkey) {
match self.accounts_by_owner.entry(owner) {
dashmap::mapref::entry::Entry::Occupied(mut occ) => {
occ.get_mut().write().unwrap().insert(account);
occ.get_mut().write().unwrap().insert(account_index);
}
dashmap::mapref::entry::Entry::Vacant(vc) => {
let mut set = HashSet::new();
set.insert(account);
set.insert(account_index);
vc.insert(Arc::new(RwLock::new(set)));
}
}
@ -72,6 +75,7 @@ impl InmemoryAccountStore {
&self,
prev_account_data: &AccountData,
new_account_data: &AccountData,
account_index: AccountIndex,
commitment: Commitment,
) -> bool {
assert_eq!(prev_account_data.pubkey, new_account_data.pubkey);
@ -82,10 +86,7 @@ impl InmemoryAccountStore {
.entry(prev_account_data.account.owner)
{
dashmap::mapref::entry::Entry::Occupied(mut occ) => {
occ.get_mut()
.write()
.unwrap()
.remove(&prev_account_data.pubkey);
occ.get_mut().write().unwrap().remove(&account_index);
}
dashmap::mapref::entry::Entry::Vacant(_) => {
// do nothing
@ -99,18 +100,19 @@ impl InmemoryAccountStore {
}
if !Self::is_deleted(new_account_data) && self.satisfies_filters(new_account_data) {
// update owner if account was not deleted but owner was change and the filter criterias are satisfied
self.add_account_owner(new_account_data.pubkey, new_account_data.account.owner);
self.add_account_owner(account_index, new_account_data.account.owner);
}
}
false
}
fn maybe_update_slot_status(
// commitment for which status was updated
fn add_account_index_to_slot_status(
&self,
account_data: &AccountData,
slot: Slot,
account_index: AccountIndex,
commitment: Commitment,
) -> Commitment {
let slot = account_data.updated_slot;
let mut lk = self.slots_status.lock().unwrap();
let slot_status = match lk.get_mut(&slot) {
Some(x) => x,
@ -129,13 +131,32 @@ impl InmemoryAccountStore {
match commitment {
Commitment::Processed | Commitment::Confirmed => {
// insert account into slot status
slot_status.accounts_updated.insert(account_data.pubkey);
slot_status.accounts_updated.insert(account_index);
slot_status.commitment
}
Commitment::Finalized => commitment,
}
}
fn maybe_update_slot_status(
&self,
account_data: &AccountData,
commitment: Commitment,
) -> Commitment {
let account_index = match self
.pubkey_to_account_index
.get(&account_data.pubkey)
.map(|x| *x)
{
Some(x) => x,
None => {
return commitment;
}
};
let slot = account_data.updated_slot;
self.add_account_index_to_slot_status(slot, account_index, commitment)
}
pub fn satisfies_filters(&self, account: &AccountData) -> bool {
self.filtered_accounts.satisfies(account)
}
@ -145,7 +166,7 @@ impl InmemoryAccountStore {
}
pub fn account_store_contains_key(&self, pubkey: &Pubkey) -> bool {
self.account_store.contains_key(pubkey)
self.pubkey_to_account_index.contains_key(pubkey)
}
}
@ -160,23 +181,27 @@ impl AccountStorageInterface for InmemoryAccountStore {
{
return false;
}
let updated_slot = account_data.updated_slot;
// check if the blockhash and slot is already confirmed
let commitment = self.maybe_update_slot_status(&account_data, commitment);
match self.account_store.entry(account_data.pubkey) {
dashmap::mapref::entry::Entry::Occupied(mut occ) => {
let account_data_in_store = occ.get_mut();
let prev_account = account_data_in_store.get_account_data(commitment);
let updated = match self.pubkey_to_account_index.entry(account_data.pubkey) {
dashmap::mapref::entry::Entry::Occupied(occ) => {
let index = *occ.get();
let read_lk = self.accounts_store.read().unwrap();
let mut acc_by_commitment = read_lk[index].write().unwrap();
let prev_account = acc_by_commitment.get_account_data(commitment);
// if account has been updated
if account_data_in_store.update(account_data.clone(), commitment) {
if acc_by_commitment.update(account_data.clone(), commitment) {
if let Some(prev_account) = prev_account {
if self.update_owner_delete_if_necessary(
&prev_account,
&account_data,
index,
commitment,
) {
acc_by_commitment.delete();
occ.remove_entry();
}
}
@ -188,17 +213,23 @@ impl AccountStorageInterface for InmemoryAccountStore {
dashmap::mapref::entry::Entry::Vacant(vac) => {
if self.satisfies_filters(&account_data) {
ACCOUNT_STORED_IN_MEMORY.inc();
self.add_account_owner(account_data.pubkey, account_data.account.owner);
vac.insert(AccountDataByCommitment::new(
let mut lk = self.accounts_store.write().unwrap();
lk.push(RwLock::new(AccountDataByCommitment::new(
account_data.clone(),
commitment,
));
)));
let index = lk.len() - 1;
drop(lk);
self.add_account_owner(index, account_data.account.owner);
self.add_account_index_to_slot_status(updated_slot, index, commitment);
vac.insert(index);
true
} else {
false
}
}
}
};
updated
}
fn initilize_or_update_account(&self, account_data: AccountData) {
@ -210,10 +241,14 @@ impl AccountStorageInterface for InmemoryAccountStore {
account_pk: Pubkey,
commitment: Commitment,
) -> Result<Option<AccountData>, AccountLoadingError> {
match self.account_store.entry(account_pk) {
dashmap::mapref::entry::Entry::Occupied(occ) => {
let account = occ.get().get_account_data(commitment);
drop(occ);
let account_index = self.pubkey_to_account_index.get(&account_pk).map(|x| *x);
let Some(account_index) = account_index else {
return Ok(None);
};
let account = self.accounts_store.read().unwrap()[account_index]
.read()
.unwrap()
.get_account_data(commitment);
if let Some(account_data) = &account {
if account_data.account.lamports > 0 {
Ok(account)
@ -224,9 +259,6 @@ impl AccountStorageInterface for InmemoryAccountStore {
Ok(None)
}
}
dashmap::mapref::entry::Entry::Vacant(_) => Ok(None),
}
}
fn get_program_accounts(
&self,
@ -249,30 +281,26 @@ impl AccountStorageInterface for InmemoryAccountStore {
};
let mut return_vec = vec![];
for program_account in lk.read().unwrap().iter() {
for program_account_index in lk.read().unwrap().iter() {
match &account_filters {
Some(account_filters) => {
match self.account_store.entry(*program_account) {
dashmap::mapref::entry::Entry::Occupied(occ) => {
let account = occ
.get()
.get_account_data_filtered(commitment, account_filters);
drop(occ);
if let Some(account_data) = account {
let read_lk = self.accounts_store.read().unwrap();
let lk = read_lk[*program_account_index].read().unwrap();
if let Some(account_data) =
lk.get_account_data_filtered(commitment, account_filters)
{
if account_data.account.lamports > 0
&& account_data.account.owner == program_pubkey
{
return_vec.push(account_data);
}
}
}
dashmap::mapref::entry::Entry::Vacant(_) => {
// do nothing
}
};
}
None => {
let account_data = self.get_account(*program_account, commitment)?;
let account_data = self.accounts_store.read().unwrap()[*program_account_index]
.read()
.unwrap()
.get_account_data(commitment);
if let Some(account_data) = account_data {
// recheck owner
if program_pubkey == account_data.account.owner {
@ -290,7 +318,7 @@ impl AccountStorageInterface for InmemoryAccountStore {
let writable_accounts = {
let mut lk = self.slots_status.lock().unwrap();
let mut current_slot = Some((slot, Some(slot_info.parent)));
let mut writable_accounts: HashMap<Pubkey, Slot> = HashMap::new();
let mut writable_accounts: HashMap<AccountIndex, Slot> = HashMap::new();
while let Some((slot, parent)) = current_slot {
current_slot = None;
match lk.get_mut(&slot) {
@ -364,12 +392,12 @@ impl AccountStorageInterface for InmemoryAccountStore {
};
let mut updated_accounts = vec![];
for (writable_account, update_slot) in writable_accounts {
match self.account_store.entry(writable_account) {
dashmap::mapref::entry::Entry::Occupied(mut occ) => {
if let Some((account_data, prev_account_data)) = occ
.get_mut()
.promote_slot_commitment(update_slot, commitment)
let lk = self.accounts_store.read().unwrap();
for (writable_account_index, update_slot) in writable_accounts {
let mut writable_lk = lk[writable_account_index].write().unwrap();
if let Some((account_data, prev_account_data)) =
writable_lk.promote_slot_commitment(update_slot, commitment)
{
if let Some(prev_account_data) = prev_account_data {
// check if owner has changed
@ -378,21 +406,18 @@ impl AccountStorageInterface for InmemoryAccountStore {
&& self.update_owner_delete_if_necessary(
&prev_account_data,
&account_data,
writable_account_index,
commitment,
)
{
occ.remove();
self.pubkey_to_account_index.remove(&account_data.pubkey);
writable_lk.delete();
}
}
// account has been confirmed first time
updated_accounts.push(account_data);
}
}
dashmap::mapref::entry::Entry::Vacant(_) => {
// do nothing
}
}
}
updated_accounts
}
@ -697,10 +722,10 @@ mod tests {
let last_account = create_random_account(&mut rng, 11, pk1, program);
store.update_account(last_account.clone(), Commitment::Processed);
let acc_index = store.pubkey_to_account_index.get(&pk1).unwrap();
assert_eq!(
store
.account_store
.get(&pk1)
store.accounts_store.read().unwrap()[*acc_index]
.read()
.unwrap()
.processed_accounts
.len(),
@ -708,9 +733,8 @@ mod tests {
);
store.process_slot_data(new_slot_info(11), Commitment::Finalized);
assert_eq!(
store
.account_store
.get(&pk1)
store.accounts_store.read().unwrap()[*acc_index]
.read()
.unwrap()
.processed_accounts
.len(),