Implementing gPA and getAccount for token program index

This commit is contained in:
godmodegalactus 2024-07-03 12:01:05 +02:00
parent 30f658da57
commit b617ea031b
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
3 changed files with 188 additions and 75 deletions

View File

@ -431,7 +431,7 @@ mod tests {
pub fn new_slot_info(slot: u64) -> SlotInfo {
SlotInfo {
slot: slot,
slot,
parent: slot.saturating_sub(1),
root: 0,
}

View File

@ -1,5 +1,5 @@
use std::{
collections::{BTreeMap, HashMap, HashSet, VecDeque},
collections::{BTreeMap, HashMap, VecDeque},
sync::{
atomic::{AtomicU64, Ordering},
Arc,
@ -130,7 +130,7 @@ impl ProcessedAccountStore {
Some(acc) => acc
.processed_accounts
.iter()
.map(|(_, account)| {
.filter_map(|(_, account)| {
token_program_account_to_solana_account(
&account.processed_account,
slot_info.slot,
@ -187,70 +187,87 @@ impl ProcessedAccountStore {
break;
}
}
let mut return_vec = vec![];
return_vec.reserve(map_of_accounts.len());
let mut return_vec = Vec::with_capacity(map_of_accounts.len());
for (_, acc) in map_of_accounts.drain() {
return_vec.push(acc);
}
return_vec
}
pub async fn get_confirmed_account(
pub async fn get_confirmed_accounts(
&self,
account_pk: Pubkey,
account_pks: Vec<Pubkey>,
return_list: &mut [Option<ProcessedAccount>],
confirmed_slot: Slot,
) -> Option<ProcessedAccount> {
) {
let mut process_slot = Some(confirmed_slot);
let lk = self.processed_slot_accounts.read().await;
while let Some(current_slot) = process_slot {
match lk.get(&current_slot) {
Some(processed_accounts) => {
match processed_accounts.processed_accounts.get(&account_pk) {
Some(acc) => {
return Some(acc.clone());
for (index, account_pk) in account_pks.iter().enumerate() {
if return_list.get(index).unwrap().is_some() {
continue;
}
None => {
process_slot = processed_accounts.slot_parent;
if let Some(acc) = processed_accounts.processed_accounts.get(account_pk) {
*return_list.get_mut(index).unwrap() = Some(acc.clone());
}
}
if return_list.iter().all(|x| x.is_some()) {
break;
}
process_slot = processed_accounts.slot_parent;
}
None => {
break;
}
}
}
None
}
pub async fn get_account(
pub async fn get_accounts(
&self,
account_pk: Pubkey,
account_pks: Vec<Pubkey>,
commitment: Commitment,
confirmed_slot: Slot,
) -> Option<ProcessedAccount> {
) -> Vec<Option<ProcessedAccount>> {
let mut return_list = vec![None; account_pks.len()];
match commitment {
Commitment::Processed => {
let lk = self.processed_slot_accounts.read().await;
// iterate backwards on all the processed slots until we reach confirmed slot
for (slot, processed_account_slots) in lk.iter().rev() {
if *slot > confirmed_slot {
if let Some(processed_account) =
processed_account_slots.processed_accounts.get(&account_pk)
{
return Some(processed_account.clone());
} else {
continue;
for (index, account_pk) in account_pks.iter().enumerate() {
if return_list.get(index).unwrap().is_some() {
continue;
}
if let Some(processed_account) =
processed_account_slots.processed_accounts.get(account_pk)
{
*return_list.get_mut(index).unwrap() =
Some(processed_account.clone());
}
}
if return_list.iter().all(|x| x.is_some()) {
break;
}
} else {
drop(lk);
return self.get_confirmed_account(account_pk, confirmed_slot).await;
self.get_confirmed_accounts(account_pks, &mut return_list, confirmed_slot)
.await;
return return_list;
}
}
None
}
Commitment::Confirmed => self.get_confirmed_account(account_pk, confirmed_slot).await,
Commitment::Confirmed => {
self.get_confirmed_accounts(account_pks, &mut return_list, confirmed_slot)
.await
}
Commitment::Finalized => unreachable!(),
}
};
return_list
}
}
@ -269,6 +286,7 @@ pub struct InMemoryTokenStorage {
}
impl InMemoryTokenStorage {
#[allow(clippy::new_without_default)]
pub fn new() -> Self {
Self {
mints_by_index: Arc::new(DashMap::new()),
@ -374,8 +392,7 @@ impl InMemoryTokenStorage {
}
pub fn is_token_program_account(&self, account_data: &AccountData) -> bool {
if self
.account_index_by_pubkey
self.account_index_by_pubkey
.contains_key(&account_data.pubkey)
|| self
.mints_index_by_pubkey
@ -383,14 +400,9 @@ impl InMemoryTokenStorage {
|| self.multisigs.contains_key(&account_data.pubkey)
|| account_data.account.owner == spl_token::id()
|| account_data.account.owner == spl_token_2022::id()
{
true
} else {
false
}
}
pub async fn get_account(&self, account_pk: Pubkey) -> Option<AccountData> {
pub async fn get_account_finalized(&self, account_pk: Pubkey) -> Option<AccountData> {
if let Some(multisig) = self.multisigs.get(&account_pk) {
return Some(token_multisig_to_solana_account(
&multisig,
@ -404,7 +416,7 @@ impl InMemoryTokenStorage {
let mint_data = self.mints_by_index.get(&mint_index).unwrap();
if let Some(mint_account) = &mint_data.mint_account {
return Some(token_mint_to_solana_account(
&mint_account,
mint_account,
self.finalized_slot.load(Ordering::Relaxed),
0,
));
@ -418,15 +430,90 @@ impl InMemoryTokenStorage {
let token_account_index = *ite as usize;
let lk = self.token_accounts.read().await;
let token_account = lk.get(token_account_index).unwrap();
return Some(token_account_to_solana_account(
if token_account.lamports == 0 {
return None;
}
return token_account_to_solana_account(
token_account,
self.finalized_slot.load(Ordering::Relaxed),
0,
&self.mints_by_index,
));
);
}
None
}
pub async fn get_program_accounts_finalized(
&self,
program_pubkey: Pubkey,
account_filters: Option<Vec<AccountFilterType>>,
) -> Result<Vec<AccountData>, AccountLoadingError> {
let finalized_slot = self.finalized_slot.load(Ordering::Relaxed);
if let Some(account_filters) = account_filters {
let (owner, mint) = get_spl_token_owner_mint_filter(&program_pubkey, &account_filters);
if let Some(owner) = owner {
match self.account_by_owner_pubkey.get(&owner) {
Some(token_acc_indexes) => {
let lk = self.token_accounts.read().await;
let indexes = token_acc_indexes.value();
let mint =
mint.map(|pk| *self.mints_index_by_pubkey.get(&pk).unwrap().value());
let token_accounts = indexes
.iter()
.filter_map(|index| lk.get(*index as usize))
.filter(|acc| {
// filter by mint if necessary
if let Some(mint) = mint {
acc.mint == mint
} else {
true
}
})
.filter_map(|token_account| {
token_account_to_solana_account(
token_account,
finalized_slot,
0,
&self.mints_by_index,
)
})
.collect_vec();
Ok(token_accounts)
}
None => Ok(vec![]),
}
} else if let Some(mint) = mint {
match self.mints_index_by_pubkey.get(&mint) {
Some(mint_index) => match self.accounts_index_by_mint.get(mint_index.value()) {
Some(token_acc_indexes) => {
let indexes = token_acc_indexes.value();
let lk = self.token_accounts.read().await;
let token_accounts = indexes
.iter()
.filter_map(|index| lk.get(*index as usize))
.filter_map(|token_account| {
token_account_to_solana_account(
token_account,
finalized_slot,
0,
&self.mints_by_index,
)
})
.collect_vec();
Ok(token_accounts)
}
None => Ok(vec![]),
},
None => Ok(vec![]),
}
} else {
Err(AccountLoadingError::ShouldContainAnAccountFilter)
}
} else {
Err(AccountLoadingError::ShouldContainAnAccountFilter)
}
}
}
#[async_trait]
@ -490,23 +577,25 @@ impl AccountStorageInterface for InMemoryTokenStorage {
Commitment::Confirmed | Commitment::Processed => {
match self
.processed_storage
.get_account(
account_pk,
.get_accounts(
vec![account_pk],
commitment,
self.confirmed_slot.load(Ordering::Relaxed),
)
.await
.get(0)
.unwrap()
{
Some(processed_account) => Ok(Some(token_program_account_to_solana_account(
Some(processed_account) => Ok(token_program_account_to_solana_account(
&processed_account.processed_account,
self.confirmed_slot.load(Ordering::Relaxed),
0,
&self.mints_by_index,
))),
None => Ok(self.get_account(account_pk).await),
)),
None => Ok(self.get_account_finalized(account_pk).await),
}
}
Commitment::Finalized => Ok(self.get_account(account_pk).await),
Commitment::Finalized => Ok(self.get_account_finalized(account_pk).await),
}
}
@ -514,30 +603,50 @@ impl AccountStorageInterface for InMemoryTokenStorage {
&self,
program_pubkey: Pubkey,
account_filters: Option<Vec<AccountFilterType>>,
_commitment: Commitment,
commitment: Commitment,
) -> Result<Vec<AccountData>, AccountLoadingError> {
if program_pubkey != spl_token::id() && program_pubkey != spl_token_2022::id() {
return Err(AccountLoadingError::WrongIndex);
}
if let Some(account_filters) = account_filters {
let (owner, mint) = get_spl_token_owner_mint_filter(&program_pubkey, &account_filters);
if let Some(owner) = owner {
match self.account_by_owner_pubkey.get(&owner) {
Some(token_acc_indexes) => todo!(),
None => Ok(vec![]),
// get all accounts for finalized commitment and update them if necessary
let mut account_data = self
.get_program_accounts_finalized(program_pubkey, account_filters)
.await?;
let confirmed_slot = self.confirmed_slot.load(Ordering::Relaxed);
if commitment == Commitment::Processed || commitment == Commitment::Confirmed {
// get list of all pks to search
let pks = account_data.iter().map(|x| x.pubkey).collect_vec();
let processed_accounts = self
.processed_storage
.get_accounts(pks, commitment, confirmed_slot)
.await;
let mut to_remove = vec![];
for (index, processed_account) in processed_accounts.iter().enumerate() {
if let Some(processed_account) = processed_account {
let updated_account = token_program_account_to_solana_account(
&processed_account.processed_account,
confirmed_slot,
processed_account.write_version,
&self.mints_by_index,
);
match updated_account {
Some(updated_account) => {
// update with processed or confirmed account state
*account_data.get_mut(index).unwrap() = updated_account;
}
None => {
// account must have been deleted
to_remove.push(index);
}
}
}
} else if let Some(mint) = mint {
match self.mints_index_by_pubkey.get(&mint) {
Some(_) => todo!(),
None => Ok(vec![]),
}
} else {
Err(AccountLoadingError::ShouldContainAnAccountFilter)
}
} else {
Err(AccountLoadingError::ShouldContainAnAccountFilter)
for index in to_remove.iter().rev() {
account_data.remove(*index);
}
}
Ok(account_data)
}
async fn process_slot_data(
@ -562,7 +671,7 @@ impl AccountStorageInterface for InMemoryTokenStorage {
let finalized_accounts = self.processed_storage.finalize(slot_info.slot).await;
let accounts_notifications = finalized_accounts
.iter()
.map(|acc| {
.filter_map(|acc| {
token_program_account_to_solana_account(
&acc.processed_account,
slot_info.slot,

View File

@ -231,7 +231,10 @@ pub fn token_account_to_solana_account(
updated_slot: u64,
write_version: u64,
mints_by_index: &Arc<DashMap<u64, MintData>>,
) -> AccountData {
) -> Option<AccountData> {
if token_account.lamports == 0 {
return None;
}
let (delegate, delegated_amount) = token_account.delegate.unwrap_or_default();
let mint = mints_by_index.get(&token_account.mint).unwrap();
let data = match token_account.program {
@ -306,12 +309,12 @@ pub fn token_account_to_solana_account(
rent_epoch: u64::MAX,
data_length,
});
AccountData {
Some(AccountData {
pubkey: token_account.pubkey,
account,
updated_slot,
write_version,
}
})
}
pub fn token_mint_to_solana_account(
@ -434,18 +437,20 @@ pub fn token_program_account_to_solana_account(
updated_slot: u64,
write_version: u64,
mints_by_index: &Arc<DashMap<u64, MintData>>,
) -> AccountData {
) -> Option<AccountData> {
match token_program_account {
TokenProgramAccountType::TokenAccount(tok_acc) => {
token_account_to_solana_account(tok_acc, updated_slot, write_version, mints_by_index)
}
TokenProgramAccountType::Mint(mint_account) => {
token_mint_to_solana_account(mint_account, updated_slot, write_version)
}
TokenProgramAccountType::MultiSig(multisig, pubkey) => {
token_multisig_to_solana_account(multisig, *pubkey, updated_slot, write_version)
}
TokenProgramAccountType::Deleted(_) => unreachable!(),
TokenProgramAccountType::Mint(mint_account) => Some(token_mint_to_solana_account(
mint_account,
updated_slot,
write_version,
)),
TokenProgramAccountType::MultiSig(multisig, pubkey) => Some(
token_multisig_to_solana_account(multisig, *pubkey, updated_slot, write_version),
),
TokenProgramAccountType::Deleted(_) => None,
}
}
@ -492,8 +497,7 @@ pub fn get_spl_token_owner_mint_filter(
_ => {}
}
}
if data_size_filter == Some(account_packed_len as u64)
|| memcmp_filter == Some(&[ACCOUNTTYPE_ACCOUNT])
if data_size_filter == Some(account_packed_len) || memcmp_filter == Some(&[ACCOUNTTYPE_ACCOUNT])
{
if let Some(incorrect_owner_len) = incorrect_owner_len {
log::error!(