Using lz4 compression and compressing at account level

This commit is contained in:
godmodegalactus 2024-04-18 12:13:29 +02:00
parent 4da31ffbd3
commit b9ff810894
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
9 changed files with 323 additions and 21 deletions

1
Cargo.lock generated
View File

@ -6834,7 +6834,6 @@ dependencies = [
"thiserror",
"tokio",
"tokio-util 0.6.9",
"zstd",
]
[[package]]

View File

@ -35,8 +35,14 @@ lazy_static! {
);
m.insert(*CONFIG_PROGRAM_ID, ParsableAccount::Config);
m.insert(*SYSTEM_PROGRAM_ID, ParsableAccount::Nonce);
m.insert(spl_token::id(), ParsableAccount::SplToken);
m.insert(spl_token_2022::id(), ParsableAccount::SplToken2022);
m.insert(
Pubkey::new_from_array(spl_token::id().to_bytes()),
ParsableAccount::SplToken,
);
m.insert(
Pubkey::new_from_array(spl_token_2022::id().to_bytes()),
ParsableAccount::SplToken2022,
);
m.insert(*STAKE_PROGRAM_ID, ParsableAccount::Stake);
m.insert(*SYSVAR_PROGRAM_ID, ParsableAccount::Sysvar);
m.insert(*VOTE_PROGRAM_ID, ParsableAccount::Vote);

View File

@ -18,12 +18,16 @@ use {
// Returns all known SPL Token program ids
pub fn spl_token_ids() -> Vec<Pubkey> {
vec![spl_token::id(), spl_token_2022::id()]
vec![
Pubkey::new_from_array(spl_token::id().to_bytes()),
Pubkey::new_from_array(spl_token_2022::id().to_bytes()),
]
}
// Check if the provided program id as a known SPL Token program id
pub fn is_known_spl_token_id(program_id: &Pubkey) -> bool {
*program_id == spl_token::id() || *program_id == spl_token_2022::id()
*program_id == Pubkey::new_from_array(spl_token::id().to_bytes())
|| *program_id == Pubkey::new_from_array(spl_token_2022::id().to_bytes())
}
// A helper function to convert spl_token::native_mint::id() as spl_sdk::pubkey::Pubkey to
@ -42,7 +46,7 @@ pub fn spl_token_native_mint() -> Pubkey {
note = "Pubkey conversions no longer needed. Please use spl_token::id() directly"
)]
pub fn spl_token_native_mint_program_id() -> Pubkey {
spl_token::id()
Pubkey::new_from_array(spl_token::id().to_bytes())
}
// A helper function to convert a solana_sdk::pubkey::Pubkey to spl_sdk::pubkey::Pubkey

View File

@ -56,7 +56,7 @@ use {
slot_hashes::SlotHashes,
sysvar::{self, instructions::construct_instructions_data},
transaction::{Result, SanitizedTransaction, TransactionAccountLocks, TransactionError},
transaction_context::{IndexOfAccount, TransactionAccount},
transaction_context::{IndexOfAccount, TransactionAccount, TransactionAccountCompressed},
},
solana_system_program::{get_system_account_kind, SystemAccountKind},
std::{
@ -964,6 +964,21 @@ impl Accounts {
}
}
fn load_while_filtering_compressed<F: Fn(&AccountSharedData) -> bool>(
collector: &mut Vec<TransactionAccountCompressed>,
some_account_tuple: Option<(&Pubkey, AccountSharedData, Slot)>,
filter: F,
) {
if let Some((pk, account)) = some_account_tuple
.filter(|(_, account, _)| Self::is_loadable(account.lamports()) && filter(account))
.map(|(pubkey, account, _slot)| (*pubkey, account))
{
let src = bincode::serialize(&account).unwrap();
let compressed = lz4::block::compress(&src, Some(lz4::block::CompressionMode::FAST(3)), true).unwrap();
collector.push((pk, compressed))
}
}
fn load_with_slot(
collector: &mut Vec<PubkeyAccountSlot>,
some_account_tuple: Option<(&Pubkey, AccountSharedData, Slot)>,
@ -1021,6 +1036,29 @@ impl Accounts {
.map(|_| collector)
}
pub fn load_by_program_with_filter_compressed<F: Fn(&AccountSharedData) -> bool>(
&self,
ancestors: &Ancestors,
bank_id: BankId,
program_id: &Pubkey,
filter: F,
config: &ScanConfig,
) -> ScanResult<Vec<TransactionAccountCompressed>> {
let mut collector = Vec::new();
self.accounts_db
.scan_accounts(
ancestors,
bank_id,
|some_account_tuple| {
Self::load_while_filtering_compressed(&mut collector, some_account_tuple, |account| {
account.owner() == program_id && filter(account)
})
},
config,
)
.map(|_| collector)
}
fn calc_scan_result_size(account: &AccountSharedData) -> usize {
account.data().len()
+ std::mem::size_of::<AccountSharedData>()
@ -1057,6 +1095,19 @@ impl Accounts {
}
}
fn maybe_abort_scan_compressed(
result: ScanResult<Vec<TransactionAccountCompressed>>,
config: &ScanConfig,
) -> ScanResult<Vec<TransactionAccountCompressed>> {
if config.is_aborted() {
ScanResult::Err(ScanError::Aborted(
"The accumulated scan results exceeded the limit".to_string(),
))
} else {
result
}
}
pub fn load_by_index_key_with_filter<F: Fn(&AccountSharedData) -> bool>(
&self,
ancestors: &Ancestors,
@ -1110,6 +1161,59 @@ impl Accounts {
Self::maybe_abort_scan(result, &config)
}
pub fn load_by_index_key_with_filter_compressed<F: Fn(&AccountSharedData) -> bool>(
&self,
ancestors: &Ancestors,
bank_id: BankId,
index_key: &IndexKey,
filter: F,
config: &ScanConfig,
byte_limit_for_scan: Option<usize>,
just_get_program_ids: bool,
) -> ScanResult<Vec<TransactionAccountCompressed>> {
let sum = AtomicUsize::default();
let config = config.recreate_with_abort();
let mut collector = Vec::new();
let result = self
.accounts_db
.index_scan_accounts(
ancestors,
bank_id,
*index_key,
|some_account_tuple| {
Self::load_while_filtering_compressed(&mut collector, some_account_tuple, |account| {
if just_get_program_ids {
if Self::accumulate_and_check_scan_result_size(
&sum,
account,
&byte_limit_for_scan,
) {
config.abort();
}
true
} else {
let use_account = filter(account);
if use_account
&& Self::accumulate_and_check_scan_result_size(
&sum,
account,
&byte_limit_for_scan,
)
{
// total size of results exceeds size limit, so abort scan
config.abort();
}
use_account
}
});
},
&config,
just_get_program_ids,
)
.map(|_| collector);
Self::maybe_abort_scan_compressed(result, &config)
}
pub fn account_indexes_include_key(&self, key: &Pubkey) -> bool {
self.accounts_db.account_indexes.include_key(key)
}

View File

@ -59,7 +59,6 @@ stream-cancel = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tokio-util = { workspace = true, features = ["codec", "compat"] }
zstd = {workspace = "true"}
[dev-dependencies]
serial_test = { workspace = true }

View File

@ -552,11 +552,11 @@ impl JsonRpcRequestProcessor {
optimize_filters(&mut filters);
let keyed_accounts = {
if let Some(owner) = get_spl_token_owner_filter(program_id, &filters) {
self.get_filtered_spl_token_accounts_by_owner(&bank, program_id, &owner, filters)?
self.get_filtered_spl_token_accounts_by_owner_compressed(&bank, program_id, &owner, filters)?
} else if let Some(mint) = get_spl_token_mint_filter(program_id, &filters) {
self.get_filtered_spl_token_accounts_by_mint(&bank, program_id, &mint, filters)?
self.get_filtered_spl_token_accounts_by_mint_compressed(&bank, program_id, &mint, filters)?
} else {
self.get_filtered_program_accounts(
self.get_filtered_program_accounts_compressed(
&bank,
program_id,
filters,
@ -569,16 +569,9 @@ impl JsonRpcRequestProcessor {
let compressed_list: Vec<RpcKeyedCompressedAccount> = keyed_accounts
.iter()
.map(|(pubkey, account)| {
let binary_account = bincode::serialize(&account)
.map_err(|_| Error::internal_error())
.unwrap();
let compressed_binary = zstd::bulk::compress(&binary_account, 0)
.map_err(|_| Error::internal_error())
.unwrap();
drop(binary_account);
RpcKeyedCompressedAccount {
p: pubkey.to_string(),
a: BASE64_STANDARD.encode(compressed_binary),
a: BASE64_STANDARD.encode(account),
}
})
.collect_vec();
@ -2133,6 +2126,66 @@ impl JsonRpcRequestProcessor {
}
}
fn get_filtered_program_accounts_compressed(
&self,
bank: &Bank,
program_id: &Pubkey,
mut filters: Vec<RpcFilterType>,
just_get_program_ids: bool,
ordered: bool,
) -> RpcCustomResult<Vec<(Pubkey, Vec<u8>)>> {
optimize_filters(&mut filters);
let filter_closure = |account: &AccountSharedData| {
filters
.iter()
.all(|filter_type| filter_type.allows(account))
};
if self
.config
.account_indexes
.contains(&AccountIndex::ProgramId)
{
if !self.config.account_indexes.include_key(program_id) {
return Err(RpcCustomError::KeyExcludedFromSecondaryIndex {
index_key: program_id.to_string(),
});
}
Ok(bank
.get_filtered_indexed_accounts_compressed(
&IndexKey::ProgramId(*program_id),
|account| {
// The program-id account index checks for Account owner on inclusion. However, due
// to the current AccountsDb implementation, an account may remain in storage as a
// zero-lamport AccountSharedData::Default() after being wiped and reinitialized in later
// updates. We include the redundant filters here to avoid returning these
// accounts.
if just_get_program_ids {
true
} else {
account.owner() == program_id && filter_closure(account)
}
},
&ScanConfig::new(!ordered),
bank.byte_limit_for_scans(),
just_get_program_ids,
)
.map_err(|e| RpcCustomError::ScanError {
message: e.to_string(),
})?)
} else {
// this path does not need to provide a mb limit because we only want to support secondary indexes
Ok(bank
.get_filtered_program_accounts_compressed(
program_id,
filter_closure,
&ScanConfig::new(!ordered),
)
.map_err(|e| RpcCustomError::ScanError {
message: e.to_string(),
})?)
}
}
fn get_program_addresses_for_bank(
&self,
bank: &Bank,
@ -2227,6 +2280,57 @@ impl JsonRpcRequestProcessor {
}
}
fn get_filtered_spl_token_accounts_by_owner_compressed(
&self,
bank: &Bank,
program_id: &Pubkey,
owner_key: &Pubkey,
mut filters: Vec<RpcFilterType>,
) -> RpcCustomResult<Vec<(Pubkey, Vec<u8>)>> {
// The by-owner accounts index checks for Token Account state and Owner address on
// inclusion. However, due to the current AccountsDb implementation, an account may remain
// in storage as a zero-lamport AccountSharedData::Default() after being wiped and reinitialized in
// later updates. We include the redundant filters here to avoid returning these accounts.
//
// Filter on Token Account state
filters.push(RpcFilterType::TokenAccountState);
// Filter on Owner address
filters.push(RpcFilterType::Memcmp(Memcmp::new_raw_bytes(
SPL_TOKEN_ACCOUNT_OWNER_OFFSET,
owner_key.to_bytes().into(),
)));
if self
.config
.account_indexes
.contains(&AccountIndex::SplTokenOwner)
{
if !self.config.account_indexes.include_key(owner_key) {
return Err(RpcCustomError::KeyExcludedFromSecondaryIndex {
index_key: owner_key.to_string(),
});
}
Ok(bank
.get_filtered_indexed_accounts_compressed(
&IndexKey::SplTokenOwner(*owner_key),
|account| {
account.owner() == program_id
&& filters
.iter()
.all(|filter_type| filter_type.allows(account))
},
&ScanConfig::default(),
bank.byte_limit_for_scans(),
false,
)
.map_err(|e| RpcCustomError::ScanError {
message: e.to_string(),
})?)
} else {
self.get_filtered_program_accounts_compressed(bank, program_id, filters, false, false)
}
}
/// Get an iterator of spl-token accounts by mint address
fn get_filtered_spl_token_accounts_by_mint(
&self,
@ -2278,6 +2382,56 @@ impl JsonRpcRequestProcessor {
}
}
fn get_filtered_spl_token_accounts_by_mint_compressed(
&self,
bank: &Bank,
program_id: &Pubkey,
mint_key: &Pubkey,
mut filters: Vec<RpcFilterType>,
) -> RpcCustomResult<Vec<(Pubkey, Vec<u8>)>> {
// The by-mint accounts index checks for Token Account state and Mint address on inclusion.
// However, due to the current AccountsDb implementation, an account may remain in storage
// as be zero-lamport AccountSharedData::Default() after being wiped and reinitialized in later
// updates. We include the redundant filters here to avoid returning these accounts.
//
// Filter on Token Account state
filters.push(RpcFilterType::TokenAccountState);
// Filter on Mint address
filters.push(RpcFilterType::Memcmp(Memcmp::new_raw_bytes(
SPL_TOKEN_ACCOUNT_MINT_OFFSET,
mint_key.to_bytes().into(),
)));
if self
.config
.account_indexes
.contains(&AccountIndex::SplTokenMint)
{
if !self.config.account_indexes.include_key(mint_key) {
return Err(RpcCustomError::KeyExcludedFromSecondaryIndex {
index_key: mint_key.to_string(),
});
}
Ok(bank
.get_filtered_indexed_accounts_compressed(
&IndexKey::SplTokenMint(*mint_key),
|account| {
account.owner() == program_id
&& filters
.iter()
.all(|filter_type| filter_type.allows(account))
},
&ScanConfig::default(),
bank.byte_limit_for_scans(),
false,
)
.map_err(|e| RpcCustomError::ScanError {
message: e.to_string(),
})?)
} else {
self.get_filtered_program_accounts_compressed(bank, program_id, filters, false, false)
}
}
fn get_latest_blockhash(&self, config: RpcContextConfig) -> Result<RpcResponse<RpcBlockhash>> {
let bank = self.get_bank_with_config(config)?;
let blockhash = bank.last_blockhash();

View File

@ -36,6 +36,7 @@
#[allow(deprecated)]
use solana_sdk::recent_blockhashes_account;
pub use solana_sdk::reward_type::RewardType;
use solana_sdk::transaction_context::TransactionAccountCompressed;
use {
crate::{
bank::metrics::*,
@ -6812,6 +6813,21 @@ impl Bank {
)
}
pub fn get_filtered_program_accounts_compressed<F: Fn(&AccountSharedData) -> bool>(
&self,
program_id: &Pubkey,
filter: F,
config: &ScanConfig,
) -> ScanResult<Vec<TransactionAccountCompressed>> {
self.rc.accounts.load_by_program_with_filter_compressed(
&self.ancestors,
self.bank_id,
program_id,
filter,
config,
)
}
pub fn get_filtered_indexed_accounts<F: Fn(&AccountSharedData) -> bool>(
&self,
index_key: &IndexKey,
@ -6831,6 +6847,25 @@ impl Bank {
)
}
pub fn get_filtered_indexed_accounts_compressed<F: Fn(&AccountSharedData) -> bool>(
&self,
index_key: &IndexKey,
filter: F,
config: &ScanConfig,
byte_limit_for_scan: Option<usize>,
just_get_program_ids: bool,
) -> ScanResult<Vec<TransactionAccountCompressed>> {
self.rc.accounts.load_by_index_key_with_filter_compressed(
&self.ancestors,
self.bank_id,
index_key,
filter,
config,
byte_limit_for_scan,
just_get_program_ids,
)
}
pub fn account_indexes_include_key(&self, key: &Pubkey) -> bool {
self.rc.accounts.account_indexes_include_key(key)
}

View File

@ -55,6 +55,7 @@ pub struct InstructionAccount {
/// An account key and the matching account
pub type TransactionAccount = (Pubkey, AccountSharedData);
pub type TransactionAccountCompressed = (Pubkey, Vec<u8>);
#[derive(Clone, Debug, PartialEq)]
pub struct TransactionAccounts {

View File

@ -822,12 +822,12 @@ fn check_num_token_accounts(accounts: &[u8], num: usize) -> Result<(), ParseInst
#[deprecated(since = "1.16.0", note = "Instruction conversions no longer needed")]
pub fn spl_token_instruction(instruction: SplTokenInstruction) -> Instruction {
Instruction {
program_id: instruction.program_id,
program_id: solana_sdk::pubkey::Pubkey::new_from_array(instruction.program_id.to_bytes()),
accounts: instruction
.accounts
.iter()
.map(|meta| AccountMeta {
pubkey: meta.pubkey,
pubkey: solana_sdk::pubkey::Pubkey::new_from_array(meta.pubkey.to_bytes()),
is_signer: meta.is_signer,
is_writable: meta.is_writable,
})