Updating accounts that are writable in the transactions, using lz4 compression

This commit is contained in:
godmodegalactus 2024-04-19 11:36:56 +02:00
parent f47a26aafb
commit c4009a2763
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
9 changed files with 341 additions and 149 deletions

22
Cargo.lock generated
View File

@ -2639,6 +2639,26 @@ version = "0.4.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c"
[[package]]
name = "lz4"
version = "1.24.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e9e2dd86df36ce760a60f6ff6ad526f7ba1f14ba0356f8254fb6905e6494df1"
dependencies = [
"libc",
"lz4-sys",
]
[[package]]
name = "lz4-sys"
version = "1.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57d27b317e207b10f69f5e75494119e391a96f48861ae870d1da6edac98ca900"
dependencies = [
"cc",
"libc",
]
[[package]]
name = "matchers"
version = "0.1.0"
@ -4438,6 +4458,7 @@ dependencies = [
"itertools 0.10.5",
"lazy_static",
"log",
"lz4",
"prometheus",
"quinn",
"rand 0.8.5",
@ -4460,6 +4481,7 @@ dependencies = [
"solana-version",
"thiserror",
"tokio",
"zstd 0.11.2+zstd.1.5.2",
]
[[package]]

View File

@ -1,4 +1,8 @@
use std::{collections::HashMap, sync::Arc, time::Duration};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::Duration,
};
use async_trait::async_trait;
use dashmap::DashSet;
@ -287,9 +291,14 @@ impl AccountStorageInterface for AccountsOnDemand {
}
}
async fn process_slot_data(&self, slot: Slot, commitment: Commitment) -> Vec<AccountData> {
async fn process_slot_data(
&self,
slot: Slot,
commitment: Commitment,
pubkeys: HashSet<Pubkey>,
) -> Vec<AccountData> {
self.accounts_storage
.process_slot_data(slot, commitment)
.process_slot_data(slot, commitment, pubkeys)
.await
}
}

View File

@ -43,6 +43,9 @@ lazy_static = { workspace = true }
solana-lite-rpc-core = { workspace = true }
solana-lite-rpc-util = { workspace = true }
zstd = "0.11.2"
lz4 = "1.24.0"
[dev-dependencies]
rand = "0.8.5"
rand_chacha = "0.3.1"

View File

@ -1,10 +1,11 @@
use std::{str::FromStr, sync::Arc};
use std::collections::HashSet;
use std::sync::Arc;
use anyhow::bail;
use itertools::Itertools;
use prometheus::{opts, register_int_gauge, IntGauge};
use solana_account_decoder::{UiAccount, UiDataSliceConfig};
use solana_lite_rpc_core::types::BlockInfoStream;
use solana_account_decoder::UiAccount;
use solana_lite_rpc_core::types::BlockStream;
use solana_lite_rpc_core::{
commitment_utils::Commitment,
structures::{
@ -13,15 +14,15 @@ use solana_lite_rpc_core::{
},
AnyhowJoinHandle,
};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_rpc_client_api::{
config::{RpcAccountInfoConfig, RpcProgramAccountsConfig},
response::RpcKeyedAccount,
};
use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, slot_history::Slot};
use solana_sdk::{pubkey::Pubkey, slot_history::Slot};
use tokio::sync::broadcast::Sender;
use crate::account_store_interface::{AccountLoadingError, AccountStorageInterface};
use crate::get_program_account::get_program_account;
lazy_static::lazy_static! {
static ref ACCOUNT_UPDATES: IntGauge =
@ -57,101 +58,28 @@ impl AccountService {
pub async fn populate_from_rpc(
&self,
rpc_client: Arc<RpcClient>,
rpc_url: String,
filters: &AccountFilters,
max_request_in_parallel: usize,
) -> anyhow::Result<()> {
const NB_ACCOUNTS_IN_GMA: usize = 100;
const NB_RETRY: usize = 10;
let mut accounts = vec![];
for filter in filters.iter() {
if !filter.accounts.is_empty() {
let mut f_accounts = filter
.accounts
.iter()
.map(|x| Pubkey::from_str(x).expect("Accounts in filters should be valid"))
.collect();
accounts.append(&mut f_accounts);
}
if let Some(program_id) = &filter.program_id {
let program_id =
Pubkey::from_str(program_id).expect("Program id in filters should be valid");
let mut rpc_acc = rpc_client
.get_program_accounts_with_config(
&program_id,
RpcProgramAccountsConfig {
filters: filter.get_rpc_filter(),
account_config: RpcAccountInfoConfig {
encoding: Some(solana_account_decoder::UiAccountEncoding::Base64),
data_slice: Some(UiDataSliceConfig {
offset: 0,
length: 0,
}),
commitment: None,
min_context_slot: None,
},
with_context: None,
},
)
.await?
.iter()
.map(|(pk, _)| *pk)
.collect_vec();
accounts.append(&mut rpc_acc);
}
}
log::info!("Fetching {} accounts", accounts.len());
for accounts in accounts.chunks(max_request_in_parallel * NB_ACCOUNTS_IN_GMA) {
for accounts in accounts.chunks(NB_ACCOUNTS_IN_GMA) {
let mut fetch_accounts = vec![];
let mut updated_slot = 0;
for _ in 0..NB_RETRY {
let accounts = rpc_client
.get_multiple_accounts_with_config(
accounts,
RpcAccountInfoConfig {
encoding: Some(solana_account_decoder::UiAccountEncoding::Base64),
data_slice: None,
commitment: Some(CommitmentConfig::finalized()),
min_context_slot: None,
},
)
.await;
match accounts {
Ok(response) => {
fetch_accounts = response.value;
updated_slot = response.context.slot;
break;
}
Err(e) => {
// retry
log::error!("Error fetching all the accounts {e:?}, retrying");
continue;
}
}
}
for (index, account) in fetch_accounts.iter().enumerate() {
if let Some(account) = account {
self.account_store
.initilize_or_update_account(AccountData {
pubkey: accounts[index],
account: Arc::new(account.clone()),
updated_slot,
})
.await;
}
}
}
}
log::info!("{} accounts successfully fetched", accounts.len());
Ok(())
get_program_account(
rpc_url,
filters,
max_request_in_parallel,
NB_RETRY,
NB_ACCOUNTS_IN_GMA,
self.account_store.clone(),
)
.await
}
pub fn process_account_stream(
&self,
mut account_stream: AccountStream,
mut blockinfo_stream: BlockInfoStream,
mut block_stream: BlockStream,
) -> Vec<AnyhowJoinHandle> {
let this = self.clone();
let processed_task = tokio::spawn(async move {
@ -187,19 +115,26 @@ impl AccountService {
let this = self.clone();
let block_processing_task = tokio::spawn(async move {
loop {
match blockinfo_stream.recv().await {
Ok(block_info) => {
if block_info.commitment_config.is_processed() {
match block_stream.recv().await {
Ok(block) => {
if block.commitment_config.is_processed() {
// processed commitment is not processed in this loop
continue;
}
let commitment = Commitment::from(block_info.commitment_config);
let commitment = Commitment::from(block.commitment_config);
let accounts_write_updated: HashSet<Pubkey> = block
.transactions
.iter()
.filter(|x| x.err.is_none())
.flat_map(|x| x.writable_accounts.clone())
.collect();
let updated_accounts = this
.account_store
.process_slot_data(block_info.slot, commitment)
.process_slot_data(block.slot, commitment, accounts_write_updated)
.await;
if block_info.commitment_config.is_finalized() {
if block.commitment_config.is_finalized() {
ACCOUNT_UPDATES_FINALIZED.add(updated_accounts.len() as i64)
} else {
ACCOUNT_UPDATES_CONFIRMED.add(updated_accounts.len() as i64);

View File

@ -1,3 +1,5 @@
use std::collections::HashSet;
use async_trait::async_trait;
use solana_lite_rpc_core::commitment_utils::Commitment;
use solana_lite_rpc_core::structures::account_data::AccountData;
@ -32,5 +34,10 @@ pub trait AccountStorageInterface: Send + Sync {
commitment: Commitment,
) -> Option<Vec<AccountData>>;
async fn process_slot_data(&self, slot: Slot, commitment: Commitment) -> Vec<AccountData>;
async fn process_slot_data(
&self,
slot: Slot,
commitment: Commitment,
writable_accounts: HashSet<Pubkey>,
) -> Vec<AccountData>;
}

View File

@ -0,0 +1,186 @@
use std::{str::FromStr, sync::Arc, time::Duration};
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use serde_json::json;
use solana_account_decoder::UiDataSliceConfig;
use solana_client::{
nonblocking::rpc_client::RpcClient,
rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig},
rpc_response::OptionalContext,
};
use solana_lite_rpc_core::{
encoding::BASE64,
structures::{account_data::AccountData, account_filter::AccountFilters},
};
use solana_sdk::{
account::{Account, AccountSharedData, ReadableAccount},
commitment_config::CommitmentConfig,
pubkey::Pubkey,
};
use crate::account_store_interface::AccountStorageInterface;
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct RpcKeyedCompressedAccount {
pub p: String,
pub a: String,
}
pub async fn get_program_account(
rpc_url: String,
filters: &AccountFilters,
max_request_in_parallel: usize,
number_of_retires: usize,
number_of_accounts_in_gma: usize,
account_store: Arc<dyn AccountStorageInterface>,
) -> anyhow::Result<()> {
// setting larget timeout because gPA can take a lot of time
let rpc_client = Arc::new(RpcClient::new_with_timeout_and_commitment(
rpc_url,
Duration::from_secs(60 * 10),
CommitmentConfig::processed(),
));
// use getGPA compressed if available
let mut accounts = vec![];
{
for filter in filters.iter() {
if !filter.accounts.is_empty() {
let mut f_accounts = filter
.accounts
.iter()
.map(|x| Pubkey::from_str(x).expect("Accounts in filters should be valid"))
.collect();
accounts.append(&mut f_accounts);
}
if let Some(program_id) = &filter.program_id {
log::info!("gPA for {}", program_id);
let program_id =
Pubkey::from_str(program_id).expect("Program id in filters should be valid");
let result = rpc_client
.send::<OptionalContext<Vec<RpcKeyedCompressedAccount>>>(
solana_client::rpc_request::RpcRequest::Custom {
method: "getProgramAccountsCompressed",
},
json!([
program_id.to_string(),
RpcProgramAccountsConfig {
filters: filter.get_rpc_filter(),
with_context: Some(true),
..Default::default()
}
]),
)
.await;
// failed to get over compressed program accounts
match result {
Ok(OptionalContext::Context(response)) => {
log::info!("Received compressed data for {}", program_id);
let updated_slot = response.context.slot;
for key_account in response.value {
let base64_decoded = BASE64.decode(&key_account.a)?;
// 64MB limit
let uncompressed = lz4::block::decompress(&base64_decoded, None)?;
let shared_data =
bincode::deserialize::<AccountSharedData>(&uncompressed)?;
let account = Account {
lamports: shared_data.lamports(),
data: shared_data.data().to_vec(),
owner: *shared_data.owner(),
executable: shared_data.executable(),
rent_epoch: shared_data.rent_epoch(),
};
account_store
.initilize_or_update_account(AccountData {
pubkey: Pubkey::from_str(&key_account.p)?,
account: Arc::new(account),
updated_slot,
})
.await;
}
}
_ => {
// getProgramAccountCompressed not available, using gPA instead
log::info!("fallback to gPA for {}", program_id);
let mut rpc_acc = rpc_client
.get_program_accounts_with_config(
&program_id,
RpcProgramAccountsConfig {
filters: filter.get_rpc_filter(),
account_config: RpcAccountInfoConfig {
encoding: Some(
solana_account_decoder::UiAccountEncoding::Base64,
),
data_slice: Some(UiDataSliceConfig {
offset: 0,
length: 0,
}),
commitment: None,
min_context_slot: None,
},
with_context: None,
},
)
.await?
.iter()
.map(|(pk, _)| *pk)
.collect_vec();
accounts.append(&mut rpc_acc);
}
}
}
}
}
log::info!("Fetching {} accounts", accounts.len());
for accounts in accounts.chunks(max_request_in_parallel * number_of_accounts_in_gma) {
for accounts in accounts.chunks(number_of_accounts_in_gma) {
let mut fetch_accounts = vec![];
let mut updated_slot = 0;
for _ in 0..number_of_retires {
let accounts = rpc_client
.get_multiple_accounts_with_config(
accounts,
RpcAccountInfoConfig {
encoding: Some(solana_account_decoder::UiAccountEncoding::Base64),
data_slice: None,
commitment: Some(CommitmentConfig::finalized()),
min_context_slot: None,
},
)
.await;
match accounts {
Ok(response) => {
fetch_accounts = response.value;
updated_slot = response.context.slot;
break;
}
Err(e) => {
// retry
log::error!("Error fetching all the accounts {e:?}, retrying");
continue;
}
}
}
for (index, account) in fetch_accounts.iter().enumerate() {
if let Some(account) = account {
account_store
.initilize_or_update_account(AccountData {
pubkey: accounts[index],
account: Arc::new(account.clone()),
updated_slot,
})
.await;
}
}
}
}
log::info!("{} accounts successfully fetched", accounts.len());
Ok(())
}

View File

@ -3,7 +3,6 @@ use std::{collections::HashSet, sync::Arc};
use crate::account_store_interface::{AccountLoadingError, AccountStorageInterface};
use async_trait::async_trait;
use dashmap::{DashMap, DashSet};
use itertools::Itertools;
use prometheus::{opts, register_int_gauge, IntGauge};
use solana_lite_rpc_core::{commitment_utils::Commitment, structures::account_data::AccountData};
use solana_rpc_client_api::filter::RpcFilterType;
@ -126,6 +125,7 @@ impl AccountDataByCommitment {
// returns promoted account
pub fn promote_slot_commitment(
&mut self,
_pubkey: Pubkey,
slot: Slot,
commitment: Commitment,
) -> Option<(AccountData, Option<AccountData>)> {
@ -193,6 +193,7 @@ impl AccountDataByCommitment {
None
} else {
//log::warn!("Expected to have processed account update for slot {} data and pk {}", slot, pubkey);
None
}
}
@ -357,7 +358,12 @@ impl AccountStorageInterface for InmemoryAccountStore {
}
}
async fn process_slot_data(&self, slot: Slot, commitment: Commitment) -> Vec<AccountData> {
async fn process_slot_data(
&self,
slot: Slot,
commitment: Commitment,
writable_accounts: HashSet<Pubkey>,
) -> Vec<AccountData> {
match commitment {
Commitment::Confirmed => {
// insert slot and blockhash that were confirmed
@ -382,22 +388,30 @@ impl AccountStorageInterface for InmemoryAccountStore {
return vec![];
}
}
let updated_accounts = self
.account_store
.iter_mut()
.filter_map(|mut acc| acc.promote_slot_commitment(slot, commitment))
.collect_vec();
// update owners
updated_accounts
.iter()
.for_each(|(account_data, prev_account_data)| {
if let Some(prev_account_data) = prev_account_data {
if prev_account_data.account.owner != account_data.account.owner {
self.update_owner(prev_account_data, account_data, commitment);
let mut updated_accounts = vec![];
for writable_account in writable_accounts {
if let Some(mut account) = self.account_store.get_mut(&writable_account) {
if let Some((account_data, prev_account_data)) =
account.promote_slot_commitment(writable_account, slot, commitment)
{
if let Some(prev_account_data) = prev_account_data {
// check if owner has changed
if prev_account_data.account.owner != account_data.account.owner {
self.update_owner(&prev_account_data, &account_data, commitment);
}
//check if account data has changed
if prev_account_data != account_data {
updated_accounts.push(account_data);
}
} else {
// account has been confirmed first time
updated_accounts.push(account_data);
}
}
});
}
}
// update number of processed accounts in memory
let number_of_processed_accounts_in_memory: i64 = self
@ -408,20 +422,6 @@ impl AccountStorageInterface for InmemoryAccountStore {
TOTAL_PROCESSED_ACCOUNTS.set(number_of_processed_accounts_in_memory);
updated_accounts
.iter()
.filter_map(|(account_data, prev_account_data)| {
if let Some(prev_account_data) = prev_account_data {
if prev_account_data != account_data {
Some(account_data)
} else {
None
}
} else {
Some(account_data)
}
})
.cloned()
.collect_vec()
}
}
@ -433,7 +433,7 @@ impl Default for InmemoryAccountStore {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::{collections::HashSet, sync::Arc};
use itertools::Itertools;
use rand::{rngs::ThreadRng, Rng};
@ -481,6 +481,11 @@ mod tests {
.await;
let account_data_1 = create_random_account(&mut rng, 0, pk2, program);
let mut pubkeys = HashSet::new();
pubkeys.insert(pk1);
pubkeys.insert(pk2);
store
.initilize_or_update_account(account_data_1.clone())
.await;
@ -542,7 +547,9 @@ mod tests {
Ok(Some(account_data_0.clone()))
);
store.process_slot_data(1, Commitment::Confirmed).await;
store
.process_slot_data(1, Commitment::Confirmed, pubkeys.clone())
.await;
assert_eq!(
store.get_account(pk1, Commitment::Processed).await,
@ -557,7 +564,9 @@ mod tests {
Ok(Some(account_data_0.clone()))
);
store.process_slot_data(2, Commitment::Confirmed).await;
store
.process_slot_data(2, Commitment::Confirmed, pubkeys.clone())
.await;
assert_eq!(
store.get_account(pk1, Commitment::Processed).await,
@ -572,7 +581,9 @@ mod tests {
Ok(Some(account_data_0.clone()))
);
store.process_slot_data(1, Commitment::Finalized).await;
store
.process_slot_data(1, Commitment::Finalized, pubkeys)
.await;
assert_eq!(
store.get_account(pk1, Commitment::Processed).await,
@ -595,6 +606,9 @@ mod tests {
let program = Pubkey::new_unique();
let pk1 = Pubkey::new_unique();
let mut pubkeys = HashSet::new();
pubkeys.insert(pk1);
let mut rng = rand::thread_rng();
store
@ -681,7 +695,9 @@ mod tests {
.len(),
12
);
store.process_slot_data(11, Commitment::Finalized).await;
store
.process_slot_data(11, Commitment::Finalized, pubkeys.clone())
.await;
assert_eq!(
store
.account_store
@ -698,7 +714,9 @@ mod tests {
);
// check finalizing previous commitment does not affect
store.process_slot_data(8, Commitment::Finalized).await;
store
.process_slot_data(8, Commitment::Finalized, pubkeys)
.await;
assert_eq!(
store.get_account(pk1, Commitment::Finalized).await,
@ -715,34 +733,37 @@ mod tests {
let mut rng = rand::thread_rng();
let pks = (0..5).map(|_| Pubkey::new_unique()).collect_vec();
let pubkeys: HashSet<Pubkey> = pks.iter().cloned().collect();
store
.update_account(
create_random_account(&mut rng, 1, Pubkey::new_unique(), prog_1),
create_random_account(&mut rng, 1, pks[0], prog_1),
Commitment::Confirmed,
)
.await;
store
.update_account(
create_random_account(&mut rng, 1, Pubkey::new_unique(), prog_1),
create_random_account(&mut rng, 1, pks[1], prog_1),
Commitment::Confirmed,
)
.await;
store
.update_account(
create_random_account(&mut rng, 1, Pubkey::new_unique(), prog_1),
create_random_account(&mut rng, 1, pks[2], prog_1),
Commitment::Confirmed,
)
.await;
store
.update_account(
create_random_account(&mut rng, 1, Pubkey::new_unique(), prog_1),
create_random_account(&mut rng, 1, pks[3], prog_1),
Commitment::Confirmed,
)
.await;
store
.update_account(
create_random_account(&mut rng, 1, Pubkey::new_unique(), prog_2),
create_random_account(&mut rng, 1, pks[4], prog_2),
Commitment::Confirmed,
)
.await;
@ -792,7 +813,9 @@ mod tests {
.await;
assert!(acc_prgram_3.is_none());
store.process_slot_data(1, Commitment::Finalized).await;
store
.process_slot_data(1, Commitment::Finalized, pubkeys.clone())
.await;
let acc_prgram_1 = store
.get_program_accounts(prog_1, None, Commitment::Finalized)
@ -812,7 +835,9 @@ mod tests {
store
.update_account(account_finalized.clone(), Commitment::Finalized)
.await;
store.process_slot_data(2, Commitment::Finalized).await;
store
.process_slot_data(2, Commitment::Finalized, pubkeys.clone())
.await;
let account_confirmed = create_random_account(&mut rng, 3, pk, prog_3);
store
@ -847,8 +872,12 @@ mod tests {
assert_eq!(f, Some(vec![account_finalized.clone()]));
store.process_slot_data(3, Commitment::Finalized).await;
store.process_slot_data(4, Commitment::Confirmed).await;
store
.process_slot_data(3, Commitment::Finalized, pubkeys.clone())
.await;
store
.process_slot_data(4, Commitment::Confirmed, pubkeys.clone())
.await;
let f = store
.get_program_accounts(prog_3, None, Commitment::Finalized)
@ -866,7 +895,9 @@ mod tests {
assert_eq!(p_3, Some(vec![]));
assert_eq!(p_4, Some(vec![account_processed.clone()]));
store.process_slot_data(4, Commitment::Finalized).await;
store
.process_slot_data(4, Commitment::Finalized, pubkeys)
.await;
let p_3 = store
.get_program_accounts(prog_3, None, Commitment::Finalized)
.await;

View File

@ -1,3 +1,4 @@
pub mod account_service;
pub mod account_store_interface;
pub mod get_program_account;
pub mod inmemory_account_store;

View File

@ -209,14 +209,12 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
let account_service = AccountService::new(account_storage, account_notification_sender);
account_service.process_account_stream(
account_stream.resubscribe(),
blockinfo_notifier.resubscribe(),
);
account_service
.process_account_stream(account_stream.resubscribe(), blocks_notifier.resubscribe());
account_service
.populate_from_rpc(
rpc_client.clone(),
rpc_client.url(),
&account_filters,
MAX_CONNECTIONS_IN_PARALLEL,
)