From 4013f91dbe04522ecf4e6723f18edd85415894be Mon Sep 17 00:00:00 2001 From: Josh Date: Thu, 11 Feb 2021 11:32:46 -0800 Subject: [PATCH] RPC: add caching to getLargestAccounts (#15154) * introduce get largest accounts cache * remove cache size and change hash key * remove eq and hash derivation from commitment config * add slot to the cache --- client/src/lib.rs | 1 + client/src/rpc_cache.rs | 75 ++++++++++++++++++++++++++++++++++++++++ client/src/rpc_config.rs | 2 +- core/src/rpc.rs | 68 ++++++++++++++++++++++++++++-------- core/src/rpc_service.rs | 8 +++++ 5 files changed, 138 insertions(+), 16 deletions(-) create mode 100644 client/src/rpc_cache.rs diff --git a/client/src/lib.rs b/client/src/lib.rs index 7e8cce4acf..70b1d60195 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -8,6 +8,7 @@ pub mod mock_sender; pub mod nonce_utils; pub mod perf_utils; pub mod pubsub_client; +pub mod rpc_cache; pub mod rpc_client; pub mod rpc_config; pub mod rpc_custom_error; diff --git a/client/src/rpc_cache.rs b/client/src/rpc_cache.rs new file mode 100644 index 0000000000..2e2a1a12d6 --- /dev/null +++ b/client/src/rpc_cache.rs @@ -0,0 +1,75 @@ +use crate::{rpc_config::RpcLargestAccountsFilter, rpc_response::RpcAccountBalance}; +use std::{ + collections::HashMap, + time::{Duration, SystemTime}, +}; + +#[derive(Debug, Clone)] +pub struct LargestAccountsCache { + duration: u64, + cache: HashMap, LargestAccountsCacheValue>, +} + +#[derive(Debug, Clone)] +struct LargestAccountsCacheValue { + accounts: Vec, + slot: u64, + cached_time: SystemTime, +} + +impl LargestAccountsCache { + pub fn new(duration: u64) -> Self { + Self { + duration, + cache: HashMap::new(), + } + } + + pub fn get_largest_accounts( + &self, + filter: &Option, + ) -> Option<(u64, Vec)> { + self.cache.get(&filter).and_then(|value| { + if let Ok(elapsed) = value.cached_time.elapsed() { + if elapsed < Duration::from_secs(self.duration) { + return Some((value.slot, value.accounts.clone())); + } + } + None + }) + } + + pub fn set_largest_accounts( + &mut self, + filter: &Option, + slot: u64, + accounts: &[RpcAccountBalance], + ) { + self.cache.insert( + filter.clone(), + LargestAccountsCacheValue { + accounts: accounts.to_owned(), + slot, + cached_time: SystemTime::now(), + }, + ); + } +} + +#[cfg(test)] +pub mod test { + use super::*; + + #[test] + fn test_old_entries_expire() { + let mut cache = LargestAccountsCache::new(1); + + let filter = Some(RpcLargestAccountsFilter::Circulating); + + let accounts: Vec = Vec::new(); + + cache.set_largest_accounts(&filter, 1000, &accounts); + std::thread::sleep(Duration::from_secs(1)); + assert_eq!(cache.get_largest_accounts(&filter), None); + } +} diff --git a/client/src/rpc_config.rs b/client/src/rpc_config.rs index aeb1cf9ce1..4ee8ed4793 100644 --- a/client/src/rpc_config.rs +++ b/client/src/rpc_config.rs @@ -31,7 +31,7 @@ pub struct RpcSimulateTransactionConfig { pub encoding: Option, } -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub enum RpcLargestAccountsFilter { Circulating, diff --git a/core/src/rpc.rs b/core/src/rpc.rs index eb54a1592c..1243e80c12 100644 --- a/core/src/rpc.rs +++ b/core/src/rpc.rs @@ -21,6 +21,7 @@ use solana_account_decoder::{ UiAccount, UiAccountData, UiAccountEncoding, UiDataSliceConfig, }; use solana_client::{ + rpc_cache::LargestAccountsCache, rpc_config::*, rpc_custom_error::RpcCustomError, rpc_filter::{Memcmp, MemcmpEncodedBytes, RpcFilterType}, @@ -136,6 +137,7 @@ pub struct JsonRpcRequestProcessor { runtime: Arc, bigtable_ledger_storage: Option, optimistically_confirmed_bank: Arc>, + largest_accounts_cache: Arc>, } impl Metadata for JsonRpcRequestProcessor {} @@ -218,6 +220,7 @@ impl JsonRpcRequestProcessor { runtime: Arc, bigtable_ledger_storage: Option, optimistically_confirmed_bank: Arc>, + largest_accounts_cache: Arc>, ) -> (Self, Receiver) { let (sender, receiver) = channel(); ( @@ -235,6 +238,7 @@ impl JsonRpcRequestProcessor { runtime, bigtable_ledger_storage, optimistically_confirmed_bank, + largest_accounts_cache, }, receiver, ) @@ -274,6 +278,7 @@ impl JsonRpcRequestProcessor { optimistically_confirmed_bank: Arc::new(RwLock::new(OptimisticallyConfirmedBank { bank: bank.clone(), })), + largest_accounts_cache: Arc::new(RwLock::new(LargestAccountsCache::new(30))), } } @@ -505,33 +510,60 @@ impl JsonRpcRequestProcessor { self.bank(commitment).capitalization() } + fn get_cached_largest_accounts( + &self, + filter: &Option, + ) -> Option<(u64, Vec)> { + let largest_accounts_cache = self.largest_accounts_cache.read().unwrap(); + largest_accounts_cache.get_largest_accounts(filter) + } + + fn set_cached_largest_accounts( + &self, + filter: &Option, + slot: u64, + accounts: &[RpcAccountBalance], + ) { + let mut largest_accounts_cache = self.largest_accounts_cache.write().unwrap(); + largest_accounts_cache.set_largest_accounts(filter, slot, accounts) + } + fn get_largest_accounts( &self, config: Option, ) -> RpcResponse> { let config = config.unwrap_or_default(); let bank = self.bank(config.commitment); - let (addresses, address_filter) = if let Some(filter) = config.filter { - let non_circulating_supply = calculate_non_circulating_supply(&bank); - let addresses = non_circulating_supply.accounts.into_iter().collect(); - let address_filter = match filter { - RpcLargestAccountsFilter::Circulating => AccountAddressFilter::Exclude, - RpcLargestAccountsFilter::NonCirculating => AccountAddressFilter::Include, - }; - (addresses, address_filter) + + if let Some((slot, accounts)) = self.get_cached_largest_accounts(&config.filter) { + Response { + context: RpcResponseContext { slot }, + value: accounts, + } } else { - (HashSet::new(), AccountAddressFilter::Exclude) - }; - new_response( - &bank, - bank.get_largest_accounts(NUM_LARGEST_ACCOUNTS, &addresses, address_filter) + let (addresses, address_filter) = if let Some(filter) = config.clone().filter { + let non_circulating_supply = calculate_non_circulating_supply(&bank); + let addresses = non_circulating_supply.accounts.into_iter().collect(); + let address_filter = match filter { + RpcLargestAccountsFilter::Circulating => AccountAddressFilter::Exclude, + RpcLargestAccountsFilter::NonCirculating => AccountAddressFilter::Include, + }; + (addresses, address_filter) + } else { + (HashSet::new(), AccountAddressFilter::Exclude) + }; + let accounts = bank + .get_largest_accounts(NUM_LARGEST_ACCOUNTS, &addresses, address_filter) .into_iter() .map(|(address, lamports)| RpcAccountBalance { address: address.to_string(), lamports, }) - .collect(), - ) + .collect::>(); + + self.set_cached_largest_accounts(&config.filter, bank.slot(), &accounts); + new_response(&bank, accounts) + } } fn get_supply(&self, commitment: Option) -> RpcResponse { @@ -3185,6 +3217,7 @@ pub mod tests { Arc::new(tokio::runtime::Runtime::new().unwrap()), None, OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), + Arc::new(RwLock::new(LargestAccountsCache::new(30))), ); SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1); @@ -4594,6 +4627,7 @@ pub mod tests { Arc::new(tokio::runtime::Runtime::new().unwrap()), None, OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), + Arc::new(RwLock::new(LargestAccountsCache::new(30))), ); SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1); @@ -4790,6 +4824,7 @@ pub mod tests { Arc::new(tokio::runtime::Runtime::new().unwrap()), None, OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), + Arc::new(RwLock::new(LargestAccountsCache::new(30))), ); SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1); assert_eq!(request_processor.validator_exit(), false); @@ -4823,6 +4858,7 @@ pub mod tests { Arc::new(tokio::runtime::Runtime::new().unwrap()), None, OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), + Arc::new(RwLock::new(LargestAccountsCache::new(30))), ); SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1); assert_eq!(request_processor.validator_exit(), true); @@ -4915,6 +4951,7 @@ pub mod tests { Arc::new(tokio::runtime::Runtime::new().unwrap()), None, OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), + Arc::new(RwLock::new(LargestAccountsCache::new(30))), ); SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1); assert_eq!( @@ -6144,6 +6181,7 @@ pub mod tests { Arc::new(tokio::runtime::Runtime::new().unwrap()), None, optimistically_confirmed_bank.clone(), + Arc::new(RwLock::new(LargestAccountsCache::new(30))), ); let mut io = MetaIoHandler::default(); diff --git a/core/src/rpc_service.rs b/core/src/rpc_service.rs index 6811dfc8e5..898f713825 100644 --- a/core/src/rpc_service.rs +++ b/core/src/rpc_service.rs @@ -16,6 +16,7 @@ use jsonrpc_http_server::{ RequestMiddlewareAction, ServerBuilder, }; use regex::Regex; +use solana_client::rpc_cache::LargestAccountsCache; use solana_ledger::blockstore::Blockstore; use solana_metrics::inc_new_counter_info; use solana_runtime::{ @@ -35,6 +36,8 @@ use std::{ use tokio::runtime; use tokio_util::codec::{BytesCodec, FramedRead}; +const LARGEST_ACCOUNTS_CACHE_DURATION: u64 = 60 * 60 * 2; + pub struct JsonRpcService { thread_hdl: JoinHandle<()>, @@ -262,6 +265,10 @@ impl JsonRpcService { override_health_check, )); + let largest_accounts_cache = Arc::new(RwLock::new(LargestAccountsCache::new( + LARGEST_ACCOUNTS_CACHE_DURATION, + ))); + let tpu_address = cluster_info.my_contact_info().tpu; let runtime = Arc::new( runtime::Builder::new_multi_thread() @@ -322,6 +329,7 @@ impl JsonRpcService { runtime, bigtable_ledger_storage, optimistically_confirmed_bank, + largest_accounts_cache, ); let leader_info =