From f54a2aad3105f65c226d80bc0c4f878926dcf7c2 Mon Sep 17 00:00:00 2001 From: godmodegalactus Date: Fri, 19 Apr 2024 16:17:02 +0200 Subject: [PATCH] Saving compressed accounts in Inmemory, adding lz4 and zstd compression methods --- Cargo.lock | 2 + Cargo.toml | 2 + accounts-on-demand/src/accounts_on_demand.rs | 87 ++------------- accounts/Cargo.toml | 4 +- accounts/src/account_service.rs | 2 +- accounts/src/get_program_account.rs | 26 +++-- accounts/src/inmemory_account_store.rs | 23 ++-- cd/lite-rpc-accounts.toml | 2 +- .../src/grpc/grpc_accounts_streaming.rs | 21 ++-- core/Cargo.toml | 2 + core/src/structures/account_data.rs | 104 +++++++++++++++++- 11 files changed, 166 insertions(+), 109 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ca2be13d..338b05bb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4688,6 +4688,7 @@ dependencies = [ "futures", "itertools 0.10.5", "log", + "lz4", "prometheus", "quinn", "rand 0.8.5", @@ -4706,6 +4707,7 @@ dependencies = [ "solana-version", "thiserror", "tokio", + "zstd 0.11.2+zstd.1.5.2", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 804105a7..8c74780c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -94,3 +94,5 @@ geyser-grpc-connector = { tag = "v0.10.6+yellowstone.1.13+solana.1.17.28", git = async-trait = "0.1.68" tonic-health = "0.10" +lz4 = "1.24.0" +zstd = "0.11.2" \ No newline at end of file diff --git a/accounts-on-demand/src/accounts_on_demand.rs b/accounts-on-demand/src/accounts_on_demand.rs index 7d782efe..ac3f86d7 100644 --- a/accounts-on-demand/src/accounts_on_demand.rs +++ b/accounts-on-demand/src/accounts_on_demand.rs @@ -9,11 +9,7 @@ use dashmap::DashSet; use futures::lock::Mutex; use itertools::Itertools; use prometheus::{opts, register_int_gauge, IntGauge}; -use solana_client::{ - nonblocking::rpc_client::RpcClient, - rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}, - rpc_filter::RpcFilterType, -}; +use solana_client::{nonblocking::rpc_client::RpcClient, rpc_filter::RpcFilterType}; use solana_lite_rpc_accounts::account_store_interface::{ AccountLoadingError, AccountStorageInterface, }; @@ -21,8 +17,8 @@ use solana_lite_rpc_cluster_endpoints::geyser_grpc_connector::GrpcSourceConfig; use solana_lite_rpc_core::{ commitment_utils::Commitment, structures::{ - account_data::{AccountData, AccountNotificationMessage}, - account_filter::{AccountFilter, AccountFilterType, AccountFilters}, + account_data::{Account, AccountData, AccountNotificationMessage, CompressionMethod}, + account_filter::{AccountFilter, AccountFilters}, }, }; use solana_sdk::{clock::Slot, pubkey::Pubkey}; @@ -173,7 +169,10 @@ impl AccountStorageInterface for AccountsOnDemand { // update account in storage and return the account data let account_data = AccountData { pubkey: account_pk, - account: Arc::new(account), + account: Arc::new(Account::from_solana_account( + account, + CompressionMethod::Lz4(1), + )), updated_slot: response.context.slot, }; self.accounts_storage @@ -217,78 +216,10 @@ impl AccountStorageInterface for AccountsOnDemand { filters: Option>, commitment: Commitment, ) -> Option> { - match self - .accounts_storage + // accounts on demand will not fetch gPA if they do not exist + self.accounts_storage .get_program_accounts(program_pubkey, filters.clone(), commitment) .await - { - Some(accounts) => { - // filter is already present - Some(accounts) - } - None => { - // check if filter is already present - let current_filters = self.get_filters().await; - let account_filter = AccountFilter { - accounts: vec![], - program_id: Some(program_pubkey.to_string()), - filters: filters - .clone() - .map(|v| v.iter().map(AccountFilterType::from).collect()), - }; - if current_filters.contains(&account_filter) { - // filter already exisits / there is no account data - return None; - } - - // add into filters - { - let mut writelk = self.program_filters.write().await; - writelk.push(account_filter.clone()); - } - self.refresh_subscription().await; - - let rpc_response = self - .rpc_client - .get_program_accounts_with_config( - &program_pubkey, - RpcProgramAccountsConfig { - filters, - account_config: RpcAccountInfoConfig { - encoding: Some(solana_account_decoder::UiAccountEncoding::Base64), - data_slice: None, - commitment: Some(commitment.into_commiment_config()), - min_context_slot: None, - }, - with_context: None, - }, - ) - .await; - match rpc_response { - Ok(program_accounts) => { - let program_accounts = program_accounts - .iter() - .map(|(pk, account)| AccountData { - pubkey: *pk, - account: Arc::new(account.clone()), - updated_slot: 0, - }) - .collect_vec(); - // add fetched accounts into cache - for account_data in &program_accounts { - self.accounts_storage - .update_account(account_data.clone(), commitment) - .await; - } - Some(program_accounts) - } - Err(e) => { - log::warn!("Got error while getting program accounts with {e:?}"); - None - } - } - } - } } async fn process_slot_data( diff --git a/accounts/Cargo.toml b/accounts/Cargo.toml index e8d9a2f5..e23bbf46 100644 --- a/accounts/Cargo.toml +++ b/accounts/Cargo.toml @@ -43,8 +43,8 @@ lazy_static = { workspace = true } solana-lite-rpc-core = { workspace = true } solana-lite-rpc-util = { workspace = true } -zstd = "0.11.2" -lz4 = "1.24.0" +zstd = { workspace = true } +lz4 = { workspace = true } [dev-dependencies] rand = "0.8.5" diff --git a/accounts/src/account_service.rs b/accounts/src/account_service.rs index 06457d11..469ff68c 100644 --- a/accounts/src/account_service.rs +++ b/accounts/src/account_service.rs @@ -174,7 +174,7 @@ impl AccountService { let data_slice = config.as_ref().map(|c| c.data_slice).unwrap_or_default(); UiAccount::encode( &account_data.pubkey, - account_data.account.as_ref(), + &account_data.account.to_solana_account(), encoding, None, data_slice, diff --git a/accounts/src/get_program_account.rs b/accounts/src/get_program_account.rs index 57b578fb..9bf5d044 100644 --- a/accounts/src/get_program_account.rs +++ b/accounts/src/get_program_account.rs @@ -11,10 +11,13 @@ use solana_client::{ }; use solana_lite_rpc_core::{ encoding::BASE64, - structures::{account_data::AccountData, account_filter::AccountFilters}, + structures::{ + account_data::{Account, AccountData, CompressionMethod}, + account_filter::AccountFilters, + }, }; use solana_sdk::{ - account::{Account, AccountSharedData, ReadableAccount}, + account::{Account as SolanaAccount, AccountSharedData, ReadableAccount}, commitment_config::CommitmentConfig, pubkey::Pubkey, }; @@ -85,21 +88,27 @@ pub async fn get_program_account( for key_account in response.value { let base64_decoded = BASE64.decode(&key_account.a)?; - // 64MB limit + // decompress all the account information let uncompressed = lz4::block::decompress(&base64_decoded, None)?; let shared_data = bincode::deserialize::(&uncompressed)?; - let account = Account { + let account = SolanaAccount { lamports: shared_data.lamports(), data: shared_data.data().to_vec(), owner: *shared_data.owner(), executable: shared_data.executable(), rent_epoch: shared_data.rent_epoch(), }; + + // compress just account_data + account_store .initilize_or_update_account(AccountData { pubkey: Pubkey::from_str(&key_account.p)?, - account: Arc::new(account), + account: Arc::new(Account::from_solana_account( + account, + CompressionMethod::Lz4(1), + )), updated_slot, }) .await; @@ -168,12 +177,15 @@ pub async fn get_program_account( } } } - for (index, account) in fetch_accounts.iter().enumerate() { + for (index, account) in fetch_accounts.drain(0..).enumerate() { if let Some(account) = account { account_store .initilize_or_update_account(AccountData { pubkey: accounts[index], - account: Arc::new(account.clone()), + account: Arc::new(Account::from_solana_account( + account, + CompressionMethod::Lz4(1), + )), updated_slot, }) .await; diff --git a/accounts/src/inmemory_account_store.rs b/accounts/src/inmemory_account_store.rs index 1f8247c7..c990b66d 100644 --- a/accounts/src/inmemory_account_store.rs +++ b/accounts/src/inmemory_account_store.rs @@ -438,9 +438,10 @@ mod tests { use itertools::Itertools; use rand::{rngs::ThreadRng, Rng}; use solana_lite_rpc_core::{ - commitment_utils::Commitment, structures::account_data::AccountData, + commitment_utils::Commitment, + structures::account_data::{Account, AccountData}, }; - use solana_sdk::{account::Account, pubkey::Pubkey, slot_history::Slot}; + use solana_sdk::{account::Account as SolanaAccount, pubkey::Pubkey, slot_history::Slot}; use crate::{ account_store_interface::AccountStorageInterface, @@ -454,15 +455,19 @@ mod tests { program: Pubkey, ) -> AccountData { let length: usize = rng.gen_range(100..1000); + let sol_account = SolanaAccount { + lamports: rng.gen(), + data: (0..length).map(|_| rng.gen::()).collect_vec(), + owner: program, + executable: false, + rent_epoch: 0, + }; AccountData { pubkey, - account: Arc::new(Account { - lamports: rng.gen(), - data: (0..length).map(|_| rng.gen::()).collect_vec(), - owner: program, - executable: false, - rent_epoch: 0, - }), + account: Arc::new(Account::from_solana_account( + sol_account, + solana_lite_rpc_core::structures::account_data::CompressionMethod::None, + )), updated_slot, } } diff --git a/cd/lite-rpc-accounts.toml b/cd/lite-rpc-accounts.toml index b948373f..893cc817 100644 --- a/cd/lite-rpc-accounts.toml +++ b/cd/lite-rpc-accounts.toml @@ -10,7 +10,7 @@ kill_timeout = 5 PORT_WS = "8891" RUST_LOG = "info" ENABLE_ADDRESS_LOOKUP_TABLES = "true" - ACCOUNT_FILTERS = "[{\"accounts\":[],\"programId\":\"4MangoMjqJ2firMokCjjGgoK8d4MXcrgL7XJaL3w6fVg\",\"filters\":null},{\"accounts\":[],\"programId\":\"3BUZXy9mPcsSCoxJQiBu2xxpMP6HEvFMZbaL5CAWwLUf\",\"filters\":null},{\"accounts\":[],\"programId\":\"SBondMDrcV3K4kxZR1HNVT7osZxAHVHgYXL5Ze1oMUv\",\"filters\":null},{\"accounts\":[],\"programId\":\"SW1TCH7qEPTdLsDHRgPuMQjbQxKdH2aBStViMFnt64f\",\"filters\":null},{\"accounts\":[],\"programId\":\"srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX\",\"filters\":null},{\"accounts\":[],\"programId\":\"opnb2LAfJYbRMAHHvqjCwQxanZn7ReEHp1k81EohpZb\",\"filters\":null},{\"accounts\":[],\"programId\":\"PhoeNiXZ8ByJGLkxNfZRnkUfjvmuYqLR89jjFHGqdXY\",\"filters\":null},{\"accounts\":[],\"programId\":\"DjVE6JNiYqPL2QXyCUUh8rNjHrbz9hXHNYt99MQ59qw1\",\"filters\":null},{\"accounts\":[],\"programId\":\"9W959DqEETiGZocYWCQPaJ6sBmUzgfxXfqGeTEdp3aQP\",\"filters\":null},{\"accounts\":[],\"programId\":\"whirLbMiicVdio4qvUfM5KAg6Ct8VwpYzGff3uctyCc\",\"filters\":null},{\"accounts\":[],\"programId\":\"82yxjeMsvaURa4MbZZ7WZZHfobirZYkH1zF8fmeGtyaQ\",\"filters\":null},{\"accounts\":[],\"programId\":\"CAMMCzo5YL8w4VFF8KVHrK22GGUsp5VTaW7grrKgrWqK\",\"filters\":null},{\"accounts\":[],\"programId\":\"675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8\",\"filters\":null},{\"accounts\":[],\"programId\":\"LBUZKhRxPF3XUpBCjp4YzTKgLccjZhTSDM9YuVaPwxo\",\"filters\":null},{\"accounts\":[],\"programId\":\"24Uqj9JCLxUeoC3hGfh5W3s9FM9uCHDS2SG3LYwBpyTi\",\"filters\":null},{\"accounts\":[],\"programId\":\"Eo7WjKq67rjJQSZxS6z3YkapzY3eMj6Xy8X5EQVn5UaB\",\"filters\":null},{\"accounts\":[],\"programId\":\"FsJ3A3u2vn5cTVofAjvy6y5kwABJAqYWpe4975bi2epH\",\"filters\":null}]" + ACCOUNT_FILTERS = "[{\"accounts\":[],\"programId\":\"srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX\",\"filters\":null},{\"accounts\":[],\"programId\":\"4MangoMjqJ2firMokCjjGgoK8d4MXcrgL7XJaL3w6fVg\",\"filters\":null},{\"accounts\":[],\"programId\":\"3BUZXy9mPcsSCoxJQiBu2xxpMP6HEvFMZbaL5CAWwLUf\",\"filters\":null},{\"accounts\":[],\"programId\":\"SBondMDrcV3K4kxZR1HNVT7osZxAHVHgYXL5Ze1oMUv\",\"filters\":null},{\"accounts\":[],\"programId\":\"SW1TCH7qEPTdLsDHRgPuMQjbQxKdH2aBStViMFnt64f\",\"filters\":null},{\"accounts\":[],\"programId\":\"srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX\",\"filters\":null},{\"accounts\":[],\"programId\":\"opnb2LAfJYbRMAHHvqjCwQxanZn7ReEHp1k81EohpZb\",\"filters\":null},{\"accounts\":[],\"programId\":\"PhoeNiXZ8ByJGLkxNfZRnkUfjvmuYqLR89jjFHGqdXY\",\"filters\":null},{\"accounts\":[],\"programId\":\"DjVE6JNiYqPL2QXyCUUh8rNjHrbz9hXHNYt99MQ59qw1\",\"filters\":null},{\"accounts\":[],\"programId\":\"9W959DqEETiGZocYWCQPaJ6sBmUzgfxXfqGeTEdp3aQP\",\"filters\":null},{\"accounts\":[],\"programId\":\"whirLbMiicVdio4qvUfM5KAg6Ct8VwpYzGff3uctyCc\",\"filters\":null},{\"accounts\":[],\"programId\":\"82yxjeMsvaURa4MbZZ7WZZHfobirZYkH1zF8fmeGtyaQ\",\"filters\":null},{\"accounts\":[],\"programId\":\"CAMMCzo5YL8w4VFF8KVHrK22GGUsp5VTaW7grrKgrWqK\",\"filters\":null},{\"accounts\":[],\"programId\":\"675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8\",\"filters\":null},{\"accounts\":[],\"programId\":\"LBUZKhRxPF3XUpBCjp4YzTKgLccjZhTSDM9YuVaPwxo\",\"filters\":null},{\"accounts\":[],\"programId\":\"24Uqj9JCLxUeoC3hGfh5W3s9FM9uCHDS2SG3LYwBpyTi\",\"filters\":null},{\"accounts\":[],\"programId\":\"Eo7WjKq67rjJQSZxS6z3YkapzY3eMj6Xy8X5EQVn5UaB\",\"filters\":null},{\"accounts\":[],\"programId\":\"FsJ3A3u2vn5cTVofAjvy6y5kwABJAqYWpe4975bi2epH\",\"filters\":null}]" ENABLE_ACCOUNT_ON_DEMAND = "true" ENABLE_SMART_ACCOUNTS_WARMUP = "true" PG_ENABLED = "false" diff --git a/cluster-endpoints/src/grpc/grpc_accounts_streaming.rs b/cluster-endpoints/src/grpc/grpc_accounts_streaming.rs index 7713de61..c011f4d6 100644 --- a/cluster-endpoints/src/grpc/grpc_accounts_streaming.rs +++ b/cluster-endpoints/src/grpc/grpc_accounts_streaming.rs @@ -14,12 +14,12 @@ use itertools::Itertools; use solana_lite_rpc_core::{ commitment_utils::Commitment, structures::{ - account_data::{AccountData, AccountNotificationMessage}, + account_data::{Account, AccountData, AccountNotificationMessage}, account_filter::{AccountFilterType, AccountFilters, MemcmpFilterData}, }, AnyhowJoinHandle, }; -use solana_sdk::{account::Account, pubkey::Pubkey}; +use solana_sdk::{account::Account as SolanaAccount, pubkey::Pubkey}; use tokio::sync::Notify; use yellowstone_grpc_proto::geyser::{ subscribe_request_filter_accounts_filter::Filter, @@ -177,16 +177,19 @@ pub fn start_account_streaming_tasks( .owner .try_into() .expect("owner pubkey should be deserializable"); + + let solana_account = SolanaAccount { + lamports: account_data.lamports, + data: account_data.data, + owner: Pubkey::new_from_array(owner), + executable: account_data.executable, + rent_epoch: account_data.rent_epoch, + }; + let notification = AccountNotificationMessage { data: AccountData { pubkey: Pubkey::new_from_array(account_pk_bytes), - account: Arc::new(Account { - lamports: account_data.lamports, - data: account_data.data, - owner: Pubkey::new_from_array(owner), - executable: account_data.executable, - rent_epoch: account_data.rent_epoch, - }), + account: Arc::new(Account::from_solana_account(solana_account, solana_lite_rpc_core::structures::account_data::CompressionMethod::Lz4(8))), updated_slot: account.slot, }, // TODO update with processed commitment / check above diff --git a/core/Cargo.toml b/core/Cargo.toml index dd62a55f..0652dd2b 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -36,6 +36,8 @@ rustls = { workspace = true } async-trait = { workspace = true } itertools = { workspace = true } prometheus = { workspace = true } +lz4 = { workspace = true } +zstd = { workspace = true } [dev-dependencies] rand = "0.8.5" diff --git a/core/src/structures/account_data.rs b/core/src/structures/account_data.rs index e479e1f0..452be796 100644 --- a/core/src/structures/account_data.rs +++ b/core/src/structures/account_data.rs @@ -1,11 +1,111 @@ use std::sync::Arc; use solana_rpc_client_api::filter::RpcFilterType; -use solana_sdk::{account::Account, pubkey::Pubkey, slot_history::Slot}; +use solana_sdk::{account::Account as SolanaAccount, pubkey::Pubkey, slot_history::Slot}; use tokio::sync::broadcast::Receiver; use crate::commitment_utils::Commitment; +// 64 MB +const MAX_ACCOUNT_SIZE: usize = 64 * 1024 * 1024; + +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] +pub enum CompressionMethod { + None, + Lz4(i32), + Zstd(i32), +} + +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] +pub enum Data { + Uncompressed(Vec), + Lz4 { binary: Vec, len: usize }, + Zstd { binary: Vec, len: usize }, +} + +impl Data { + pub fn len(&self) -> usize { + match self { + Data::Uncompressed(d) => d.len(), + Data::Lz4 { len, .. } => *len, + Data::Zstd { len, .. } => *len, + } + } + + pub fn is_empty(&self) -> bool { + match self { + Data::Uncompressed(d) => d.is_empty(), + Data::Lz4 { len, .. } => *len == 0, + Data::Zstd { len, .. } => *len == 0, + } + } + + pub fn data(&self) -> Vec { + match self { + Data::Uncompressed(d) => d.clone(), + Data::Lz4 { binary, .. } => lz4::block::decompress(binary, None).unwrap(), + Data::Zstd { binary, .. } => zstd::bulk::decompress(binary, MAX_ACCOUNT_SIZE).unwrap(), + } + } +} + +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] +pub struct Account { + /// lamports in the account + pub lamports: u64, + /// data held in this account + pub data: Data, + /// the program that owns this account. If executable, the program that loads this account. + pub owner: Pubkey, + /// this account's data contains a loaded program (and is now read-only) + pub executable: bool, + /// the epoch at which this account will next owe rent + pub rent_epoch: u64, +} + +impl Account { + pub fn from_solana_account( + account: SolanaAccount, + compression_method: CompressionMethod, + ) -> Self { + let data = match compression_method { + CompressionMethod::None => Data::Uncompressed(account.data), + CompressionMethod::Lz4(level) => { + let len = account.data.len(); + let binary = lz4::block::compress( + &account.data, + Some(lz4::block::CompressionMode::FAST(level)), + true, + ) + .unwrap(); + Data::Lz4 { binary, len } + } + CompressionMethod::Zstd(level) => { + let len = account.data.len(); + let binary = zstd::bulk::compress(&account.data, level).unwrap(); + Data::Zstd { binary, len } + } + }; + Self { + lamports: account.lamports, + data, + owner: account.owner, + executable: account.executable, + rent_epoch: account.rent_epoch, + } + } + + pub fn to_solana_account(&self) -> SolanaAccount { + SolanaAccount { + lamports: self.lamports, + data: self.data.data(), + owner: self.owner, + executable: self.executable, + rent_epoch: self.rent_epoch, + } + } +} + #[derive(Clone, Debug)] pub struct AccountData { pub pubkey: Pubkey, @@ -17,7 +117,7 @@ impl AccountData { pub fn allows(&self, filter: &RpcFilterType) -> bool { match filter { RpcFilterType::DataSize(size) => self.account.data.len() as u64 == *size, - RpcFilterType::Memcmp(compare) => compare.bytes_match(&self.account.data), + RpcFilterType::Memcmp(compare) => compare.bytes_match(&self.account.data.data()), RpcFilterType::TokenAccountState => { // todo false