Saving compressed accounts in Inmemory, adding lz4 and zstd compression methods

This commit is contained in:
godmodegalactus 2024-04-19 16:17:02 +02:00
parent c4009a2763
commit f54a2aad31
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
11 changed files with 166 additions and 109 deletions

2
Cargo.lock generated
View File

@ -4688,6 +4688,7 @@ dependencies = [
"futures",
"itertools 0.10.5",
"log",
"lz4",
"prometheus",
"quinn",
"rand 0.8.5",
@ -4706,6 +4707,7 @@ dependencies = [
"solana-version",
"thiserror",
"tokio",
"zstd 0.11.2+zstd.1.5.2",
]
[[package]]

View File

@ -94,3 +94,5 @@ geyser-grpc-connector = { tag = "v0.10.6+yellowstone.1.13+solana.1.17.28", git =
async-trait = "0.1.68"
tonic-health = "0.10"
lz4 = "1.24.0"
zstd = "0.11.2"

View File

@ -9,11 +9,7 @@ use dashmap::DashSet;
use futures::lock::Mutex;
use itertools::Itertools;
use prometheus::{opts, register_int_gauge, IntGauge};
use solana_client::{
nonblocking::rpc_client::RpcClient,
rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig},
rpc_filter::RpcFilterType,
};
use solana_client::{nonblocking::rpc_client::RpcClient, rpc_filter::RpcFilterType};
use solana_lite_rpc_accounts::account_store_interface::{
AccountLoadingError, AccountStorageInterface,
};
@ -21,8 +17,8 @@ use solana_lite_rpc_cluster_endpoints::geyser_grpc_connector::GrpcSourceConfig;
use solana_lite_rpc_core::{
commitment_utils::Commitment,
structures::{
account_data::{AccountData, AccountNotificationMessage},
account_filter::{AccountFilter, AccountFilterType, AccountFilters},
account_data::{Account, AccountData, AccountNotificationMessage, CompressionMethod},
account_filter::{AccountFilter, AccountFilters},
},
};
use solana_sdk::{clock::Slot, pubkey::Pubkey};
@ -173,7 +169,10 @@ impl AccountStorageInterface for AccountsOnDemand {
// update account in storage and return the account data
let account_data = AccountData {
pubkey: account_pk,
account: Arc::new(account),
account: Arc::new(Account::from_solana_account(
account,
CompressionMethod::Lz4(1),
)),
updated_slot: response.context.slot,
};
self.accounts_storage
@ -217,78 +216,10 @@ impl AccountStorageInterface for AccountsOnDemand {
filters: Option<Vec<RpcFilterType>>,
commitment: Commitment,
) -> Option<Vec<AccountData>> {
match self
.accounts_storage
// accounts on demand will not fetch gPA if they do not exist
self.accounts_storage
.get_program_accounts(program_pubkey, filters.clone(), commitment)
.await
{
Some(accounts) => {
// filter is already present
Some(accounts)
}
None => {
// check if filter is already present
let current_filters = self.get_filters().await;
let account_filter = AccountFilter {
accounts: vec![],
program_id: Some(program_pubkey.to_string()),
filters: filters
.clone()
.map(|v| v.iter().map(AccountFilterType::from).collect()),
};
if current_filters.contains(&account_filter) {
// filter already exisits / there is no account data
return None;
}
// add into filters
{
let mut writelk = self.program_filters.write().await;
writelk.push(account_filter.clone());
}
self.refresh_subscription().await;
let rpc_response = self
.rpc_client
.get_program_accounts_with_config(
&program_pubkey,
RpcProgramAccountsConfig {
filters,
account_config: RpcAccountInfoConfig {
encoding: Some(solana_account_decoder::UiAccountEncoding::Base64),
data_slice: None,
commitment: Some(commitment.into_commiment_config()),
min_context_slot: None,
},
with_context: None,
},
)
.await;
match rpc_response {
Ok(program_accounts) => {
let program_accounts = program_accounts
.iter()
.map(|(pk, account)| AccountData {
pubkey: *pk,
account: Arc::new(account.clone()),
updated_slot: 0,
})
.collect_vec();
// add fetched accounts into cache
for account_data in &program_accounts {
self.accounts_storage
.update_account(account_data.clone(), commitment)
.await;
}
Some(program_accounts)
}
Err(e) => {
log::warn!("Got error while getting program accounts with {e:?}");
None
}
}
}
}
}
async fn process_slot_data(

View File

@ -43,8 +43,8 @@ lazy_static = { workspace = true }
solana-lite-rpc-core = { workspace = true }
solana-lite-rpc-util = { workspace = true }
zstd = "0.11.2"
lz4 = "1.24.0"
zstd = { workspace = true }
lz4 = { workspace = true }
[dev-dependencies]
rand = "0.8.5"

View File

@ -174,7 +174,7 @@ impl AccountService {
let data_slice = config.as_ref().map(|c| c.data_slice).unwrap_or_default();
UiAccount::encode(
&account_data.pubkey,
account_data.account.as_ref(),
&account_data.account.to_solana_account(),
encoding,
None,
data_slice,

View File

@ -11,10 +11,13 @@ use solana_client::{
};
use solana_lite_rpc_core::{
encoding::BASE64,
structures::{account_data::AccountData, account_filter::AccountFilters},
structures::{
account_data::{Account, AccountData, CompressionMethod},
account_filter::AccountFilters,
},
};
use solana_sdk::{
account::{Account, AccountSharedData, ReadableAccount},
account::{Account as SolanaAccount, AccountSharedData, ReadableAccount},
commitment_config::CommitmentConfig,
pubkey::Pubkey,
};
@ -85,21 +88,27 @@ pub async fn get_program_account(
for key_account in response.value {
let base64_decoded = BASE64.decode(&key_account.a)?;
// 64MB limit
// decompress all the account information
let uncompressed = lz4::block::decompress(&base64_decoded, None)?;
let shared_data =
bincode::deserialize::<AccountSharedData>(&uncompressed)?;
let account = Account {
let account = SolanaAccount {
lamports: shared_data.lamports(),
data: shared_data.data().to_vec(),
owner: *shared_data.owner(),
executable: shared_data.executable(),
rent_epoch: shared_data.rent_epoch(),
};
// compress just account_data
account_store
.initilize_or_update_account(AccountData {
pubkey: Pubkey::from_str(&key_account.p)?,
account: Arc::new(account),
account: Arc::new(Account::from_solana_account(
account,
CompressionMethod::Lz4(1),
)),
updated_slot,
})
.await;
@ -168,12 +177,15 @@ pub async fn get_program_account(
}
}
}
for (index, account) in fetch_accounts.iter().enumerate() {
for (index, account) in fetch_accounts.drain(0..).enumerate() {
if let Some(account) = account {
account_store
.initilize_or_update_account(AccountData {
pubkey: accounts[index],
account: Arc::new(account.clone()),
account: Arc::new(Account::from_solana_account(
account,
CompressionMethod::Lz4(1),
)),
updated_slot,
})
.await;

View File

@ -438,9 +438,10 @@ mod tests {
use itertools::Itertools;
use rand::{rngs::ThreadRng, Rng};
use solana_lite_rpc_core::{
commitment_utils::Commitment, structures::account_data::AccountData,
commitment_utils::Commitment,
structures::account_data::{Account, AccountData},
};
use solana_sdk::{account::Account, pubkey::Pubkey, slot_history::Slot};
use solana_sdk::{account::Account as SolanaAccount, pubkey::Pubkey, slot_history::Slot};
use crate::{
account_store_interface::AccountStorageInterface,
@ -454,15 +455,19 @@ mod tests {
program: Pubkey,
) -> AccountData {
let length: usize = rng.gen_range(100..1000);
let sol_account = SolanaAccount {
lamports: rng.gen(),
data: (0..length).map(|_| rng.gen::<u8>()).collect_vec(),
owner: program,
executable: false,
rent_epoch: 0,
};
AccountData {
pubkey,
account: Arc::new(Account {
lamports: rng.gen(),
data: (0..length).map(|_| rng.gen::<u8>()).collect_vec(),
owner: program,
executable: false,
rent_epoch: 0,
}),
account: Arc::new(Account::from_solana_account(
sol_account,
solana_lite_rpc_core::structures::account_data::CompressionMethod::None,
)),
updated_slot,
}
}

View File

@ -10,7 +10,7 @@ kill_timeout = 5
PORT_WS = "8891"
RUST_LOG = "info"
ENABLE_ADDRESS_LOOKUP_TABLES = "true"
ACCOUNT_FILTERS = "[{\"accounts\":[],\"programId\":\"4MangoMjqJ2firMokCjjGgoK8d4MXcrgL7XJaL3w6fVg\",\"filters\":null},{\"accounts\":[],\"programId\":\"3BUZXy9mPcsSCoxJQiBu2xxpMP6HEvFMZbaL5CAWwLUf\",\"filters\":null},{\"accounts\":[],\"programId\":\"SBondMDrcV3K4kxZR1HNVT7osZxAHVHgYXL5Ze1oMUv\",\"filters\":null},{\"accounts\":[],\"programId\":\"SW1TCH7qEPTdLsDHRgPuMQjbQxKdH2aBStViMFnt64f\",\"filters\":null},{\"accounts\":[],\"programId\":\"srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX\",\"filters\":null},{\"accounts\":[],\"programId\":\"opnb2LAfJYbRMAHHvqjCwQxanZn7ReEHp1k81EohpZb\",\"filters\":null},{\"accounts\":[],\"programId\":\"PhoeNiXZ8ByJGLkxNfZRnkUfjvmuYqLR89jjFHGqdXY\",\"filters\":null},{\"accounts\":[],\"programId\":\"DjVE6JNiYqPL2QXyCUUh8rNjHrbz9hXHNYt99MQ59qw1\",\"filters\":null},{\"accounts\":[],\"programId\":\"9W959DqEETiGZocYWCQPaJ6sBmUzgfxXfqGeTEdp3aQP\",\"filters\":null},{\"accounts\":[],\"programId\":\"whirLbMiicVdio4qvUfM5KAg6Ct8VwpYzGff3uctyCc\",\"filters\":null},{\"accounts\":[],\"programId\":\"82yxjeMsvaURa4MbZZ7WZZHfobirZYkH1zF8fmeGtyaQ\",\"filters\":null},{\"accounts\":[],\"programId\":\"CAMMCzo5YL8w4VFF8KVHrK22GGUsp5VTaW7grrKgrWqK\",\"filters\":null},{\"accounts\":[],\"programId\":\"675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8\",\"filters\":null},{\"accounts\":[],\"programId\":\"LBUZKhRxPF3XUpBCjp4YzTKgLccjZhTSDM9YuVaPwxo\",\"filters\":null},{\"accounts\":[],\"programId\":\"24Uqj9JCLxUeoC3hGfh5W3s9FM9uCHDS2SG3LYwBpyTi\",\"filters\":null},{\"accounts\":[],\"programId\":\"Eo7WjKq67rjJQSZxS6z3YkapzY3eMj6Xy8X5EQVn5UaB\",\"filters\":null},{\"accounts\":[],\"programId\":\"FsJ3A3u2vn5cTVofAjvy6y5kwABJAqYWpe4975bi2epH\",\"filters\":null}]"
ACCOUNT_FILTERS = "[{\"accounts\":[],\"programId\":\"srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX\",\"filters\":null},{\"accounts\":[],\"programId\":\"4MangoMjqJ2firMokCjjGgoK8d4MXcrgL7XJaL3w6fVg\",\"filters\":null},{\"accounts\":[],\"programId\":\"3BUZXy9mPcsSCoxJQiBu2xxpMP6HEvFMZbaL5CAWwLUf\",\"filters\":null},{\"accounts\":[],\"programId\":\"SBondMDrcV3K4kxZR1HNVT7osZxAHVHgYXL5Ze1oMUv\",\"filters\":null},{\"accounts\":[],\"programId\":\"SW1TCH7qEPTdLsDHRgPuMQjbQxKdH2aBStViMFnt64f\",\"filters\":null},{\"accounts\":[],\"programId\":\"srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX\",\"filters\":null},{\"accounts\":[],\"programId\":\"opnb2LAfJYbRMAHHvqjCwQxanZn7ReEHp1k81EohpZb\",\"filters\":null},{\"accounts\":[],\"programId\":\"PhoeNiXZ8ByJGLkxNfZRnkUfjvmuYqLR89jjFHGqdXY\",\"filters\":null},{\"accounts\":[],\"programId\":\"DjVE6JNiYqPL2QXyCUUh8rNjHrbz9hXHNYt99MQ59qw1\",\"filters\":null},{\"accounts\":[],\"programId\":\"9W959DqEETiGZocYWCQPaJ6sBmUzgfxXfqGeTEdp3aQP\",\"filters\":null},{\"accounts\":[],\"programId\":\"whirLbMiicVdio4qvUfM5KAg6Ct8VwpYzGff3uctyCc\",\"filters\":null},{\"accounts\":[],\"programId\":\"82yxjeMsvaURa4MbZZ7WZZHfobirZYkH1zF8fmeGtyaQ\",\"filters\":null},{\"accounts\":[],\"programId\":\"CAMMCzo5YL8w4VFF8KVHrK22GGUsp5VTaW7grrKgrWqK\",\"filters\":null},{\"accounts\":[],\"programId\":\"675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8\",\"filters\":null},{\"accounts\":[],\"programId\":\"LBUZKhRxPF3XUpBCjp4YzTKgLccjZhTSDM9YuVaPwxo\",\"filters\":null},{\"accounts\":[],\"programId\":\"24Uqj9JCLxUeoC3hGfh5W3s9FM9uCHDS2SG3LYwBpyTi\",\"filters\":null},{\"accounts\":[],\"programId\":\"Eo7WjKq67rjJQSZxS6z3YkapzY3eMj6Xy8X5EQVn5UaB\",\"filters\":null},{\"accounts\":[],\"programId\":\"FsJ3A3u2vn5cTVofAjvy6y5kwABJAqYWpe4975bi2epH\",\"filters\":null}]"
ENABLE_ACCOUNT_ON_DEMAND = "true"
ENABLE_SMART_ACCOUNTS_WARMUP = "true"
PG_ENABLED = "false"

View File

@ -14,12 +14,12 @@ use itertools::Itertools;
use solana_lite_rpc_core::{
commitment_utils::Commitment,
structures::{
account_data::{AccountData, AccountNotificationMessage},
account_data::{Account, AccountData, AccountNotificationMessage},
account_filter::{AccountFilterType, AccountFilters, MemcmpFilterData},
},
AnyhowJoinHandle,
};
use solana_sdk::{account::Account, pubkey::Pubkey};
use solana_sdk::{account::Account as SolanaAccount, pubkey::Pubkey};
use tokio::sync::Notify;
use yellowstone_grpc_proto::geyser::{
subscribe_request_filter_accounts_filter::Filter,
@ -177,16 +177,19 @@ pub fn start_account_streaming_tasks(
.owner
.try_into()
.expect("owner pubkey should be deserializable");
let solana_account = SolanaAccount {
lamports: account_data.lamports,
data: account_data.data,
owner: Pubkey::new_from_array(owner),
executable: account_data.executable,
rent_epoch: account_data.rent_epoch,
};
let notification = AccountNotificationMessage {
data: AccountData {
pubkey: Pubkey::new_from_array(account_pk_bytes),
account: Arc::new(Account {
lamports: account_data.lamports,
data: account_data.data,
owner: Pubkey::new_from_array(owner),
executable: account_data.executable,
rent_epoch: account_data.rent_epoch,
}),
account: Arc::new(Account::from_solana_account(solana_account, solana_lite_rpc_core::structures::account_data::CompressionMethod::Lz4(8))),
updated_slot: account.slot,
},
// TODO update with processed commitment / check above

View File

@ -36,6 +36,8 @@ rustls = { workspace = true }
async-trait = { workspace = true }
itertools = { workspace = true }
prometheus = { workspace = true }
lz4 = { workspace = true }
zstd = { workspace = true }
[dev-dependencies]
rand = "0.8.5"

View File

@ -1,11 +1,111 @@
use std::sync::Arc;
use solana_rpc_client_api::filter::RpcFilterType;
use solana_sdk::{account::Account, pubkey::Pubkey, slot_history::Slot};
use solana_sdk::{account::Account as SolanaAccount, pubkey::Pubkey, slot_history::Slot};
use tokio::sync::broadcast::Receiver;
use crate::commitment_utils::Commitment;
// 64 MB
const MAX_ACCOUNT_SIZE: usize = 64 * 1024 * 1024;
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub enum CompressionMethod {
None,
Lz4(i32),
Zstd(i32),
}
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub enum Data {
Uncompressed(Vec<u8>),
Lz4 { binary: Vec<u8>, len: usize },
Zstd { binary: Vec<u8>, len: usize },
}
impl Data {
pub fn len(&self) -> usize {
match self {
Data::Uncompressed(d) => d.len(),
Data::Lz4 { len, .. } => *len,
Data::Zstd { len, .. } => *len,
}
}
pub fn is_empty(&self) -> bool {
match self {
Data::Uncompressed(d) => d.is_empty(),
Data::Lz4 { len, .. } => *len == 0,
Data::Zstd { len, .. } => *len == 0,
}
}
pub fn data(&self) -> Vec<u8> {
match self {
Data::Uncompressed(d) => d.clone(),
Data::Lz4 { binary, .. } => lz4::block::decompress(binary, None).unwrap(),
Data::Zstd { binary, .. } => zstd::bulk::decompress(binary, MAX_ACCOUNT_SIZE).unwrap(),
}
}
}
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct Account {
/// lamports in the account
pub lamports: u64,
/// data held in this account
pub data: Data,
/// the program that owns this account. If executable, the program that loads this account.
pub owner: Pubkey,
/// this account's data contains a loaded program (and is now read-only)
pub executable: bool,
/// the epoch at which this account will next owe rent
pub rent_epoch: u64,
}
impl Account {
pub fn from_solana_account(
account: SolanaAccount,
compression_method: CompressionMethod,
) -> Self {
let data = match compression_method {
CompressionMethod::None => Data::Uncompressed(account.data),
CompressionMethod::Lz4(level) => {
let len = account.data.len();
let binary = lz4::block::compress(
&account.data,
Some(lz4::block::CompressionMode::FAST(level)),
true,
)
.unwrap();
Data::Lz4 { binary, len }
}
CompressionMethod::Zstd(level) => {
let len = account.data.len();
let binary = zstd::bulk::compress(&account.data, level).unwrap();
Data::Zstd { binary, len }
}
};
Self {
lamports: account.lamports,
data,
owner: account.owner,
executable: account.executable,
rent_epoch: account.rent_epoch,
}
}
pub fn to_solana_account(&self) -> SolanaAccount {
SolanaAccount {
lamports: self.lamports,
data: self.data.data(),
owner: self.owner,
executable: self.executable,
rent_epoch: self.rent_epoch,
}
}
}
#[derive(Clone, Debug)]
pub struct AccountData {
pub pubkey: Pubkey,
@ -17,7 +117,7 @@ impl AccountData {
pub fn allows(&self, filter: &RpcFilterType) -> bool {
match filter {
RpcFilterType::DataSize(size) => self.account.data.len() as u64 == *size,
RpcFilterType::Memcmp(compare) => compare.bytes_match(&self.account.data),
RpcFilterType::Memcmp(compare) => compare.bytes_match(&self.account.data.data()),
RpcFilterType::TokenAccountState => {
// todo
false