Optimizing GPA

This commit is contained in:
Godmode Galactus 2024-11-22 11:21:16 +01:00
parent fb0f1bb453
commit 5e36199f17
No known key found for this signature in database
GPG Key ID: A6B75566742EA987
15 changed files with 1325 additions and 68 deletions

View File

@ -53,7 +53,7 @@ impl AccountDataByCommitment {
pub fn get_account_data_filtered(
&self,
commitment: Commitment,
mut filters: Vec<AccountFilterType>,
filters: &Vec<AccountFilterType>,
) -> Option<AccountData> {
let account_data = match commitment {
Commitment::Processed => self
@ -69,28 +69,15 @@ impl AccountDataByCommitment {
return None;
};
let tmp_filters = filters.clone();
let size = tmp_filters.iter().enumerate().find(|x| match x.1 {
AccountFilterType::Datasize(_) => true,
AccountFilterType::Memcmp(_) => false,
});
// filter by size
match size {
Some((index, AccountFilterType::Datasize(size))) => {
if *size != account_data.account.data.len() as u64 {
// check size filter first before decompressing
for filter in filters {
if let AccountFilterType::Datasize(size) = filter {
if account_data.account.data.len() as u64 != *size {
return None;
} else {
filters.remove(index);
}
}
None => {
// check other filters
}
Some(_) => {
return None;
}
}
// match other filters
if !filters.is_empty() {
let data = account_data.account.data.data();

View File

@ -16,6 +16,7 @@ use lite_account_manager_common::{
use prometheus::{opts, register_int_gauge, IntGauge};
use solana_sdk::{pubkey::Pubkey, slot_history::Slot};
use std::collections::BTreeMap;
use std::sync::RwLock;
lazy_static::lazy_static! {
static ref ACCOUNT_STORED_IN_MEMORY: IntGauge =
@ -36,7 +37,7 @@ struct SlotStatus {
pub struct InmemoryAccountStore {
account_store: Arc<DashMap<Pubkey, AccountDataByCommitment>>,
accounts_by_owner: Arc<DashMap<Pubkey, HashSet<Pubkey>>>,
accounts_by_owner: Arc<DashMap<Pubkey, Arc<RwLock<HashSet<Pubkey>>>>>,
slots_status: Arc<Mutex<BTreeMap<Slot, SlotStatus>>>,
filtered_accounts: Arc<dyn AccountFiltersStoreInterface>,
}
@ -54,12 +55,12 @@ impl InmemoryAccountStore {
fn add_account_owner(&self, account: Pubkey, owner: Pubkey) {
match self.accounts_by_owner.entry(owner) {
dashmap::mapref::entry::Entry::Occupied(mut occ) => {
occ.get_mut().insert(account);
occ.get_mut().write().unwrap().insert(account);
}
dashmap::mapref::entry::Entry::Vacant(vc) => {
let mut set = HashSet::new();
set.insert(account);
vc.insert(set);
vc.insert(Arc::new(RwLock::new(set)));
}
}
}
@ -81,7 +82,10 @@ impl InmemoryAccountStore {
.entry(prev_account_data.account.owner)
{
dashmap::mapref::entry::Entry::Occupied(mut occ) => {
occ.get_mut().remove(&prev_account_data.pubkey);
occ.get_mut()
.write()
.unwrap()
.remove(&prev_account_data.pubkey);
}
dashmap::mapref::entry::Entry::Vacant(_) => {
// do nothing
@ -237,48 +241,48 @@ impl AccountStorageInterface for InmemoryAccountStore {
}) {
return Err(AccountLoadingError::ConfigDoesnotContainRequiredFilters);
}
match self.accounts_by_owner.entry(program_pubkey) {
dashmap::mapref::entry::Entry::Occupied(occ) => {
let mut return_vec = vec![];
let program_pubkeys = occ.get();
for program_account in program_pubkeys {
match &account_filters {
Some(account_filters) => {
match self.account_store.entry(*program_account) {
dashmap::mapref::entry::Entry::Occupied(occ) => {
let account = occ.get().get_account_data_filtered(
commitment,
account_filters.clone(),
);
drop(occ);
if let Some(account_data) = account {
if account_data.account.lamports > 0
&& account_data.account.owner == program_pubkey
{
return_vec.push(account_data);
}
}
}
dashmap::mapref::entry::Entry::Vacant(_) => {
// do nothing
}
};
}
None => {
let account_data = self.get_account(*program_account, commitment)?;
if let Some(account_data) = account_data {
// recheck owner
if program_pubkey == account_data.account.owner {
let lk = match self.accounts_by_owner.entry(program_pubkey) {
dashmap::mapref::entry::Entry::Occupied(occ) => occ.get().clone(),
dashmap::mapref::entry::Entry::Vacant(_) => {
return Ok(vec![]);
}
};
let mut return_vec = vec![];
for program_account in lk.read().unwrap().iter() {
match &account_filters {
Some(account_filters) => {
match self.account_store.entry(*program_account) {
dashmap::mapref::entry::Entry::Occupied(occ) => {
let account = occ
.get()
.get_account_data_filtered(commitment, account_filters);
drop(occ);
if let Some(account_data) = account {
if account_data.account.lamports > 0
&& account_data.account.owner == program_pubkey
{
return_vec.push(account_data);
}
}
}
dashmap::mapref::entry::Entry::Vacant(_) => {
// do nothing
}
};
}
None => {
let account_data = self.get_account(*program_account, commitment)?;
if let Some(account_data) = account_data {
// recheck owner
if program_pubkey == account_data.account.owner {
return_vec.push(account_data);
}
}
}
Ok(return_vec)
}
dashmap::mapref::entry::Entry::Vacant(_) => Ok(vec![]),
}
Ok(return_vec)
}
fn process_slot_data(&self, slot_info: SlotInfo, commitment: Commitment) -> Vec<AccountData> {

View File

@ -24,13 +24,13 @@ pub enum Data {
}
impl Data {
pub fn new(data:&[u8], compression_method: CompressionMethod) -> Self {
pub fn new(data: &[u8], compression_method: CompressionMethod) -> Self {
match compression_method {
CompressionMethod::None => Data::Uncompressed(data.to_vec()),
CompressionMethod::Lz4(level) => {
let len = data.len();
let binary = lz4::block::compress(
&data,
data,
Some(lz4::block::CompressionMode::FAST(level)),
true,
)
@ -39,7 +39,7 @@ impl Data {
}
CompressionMethod::Zstd(level) => {
let len = data.len();
let binary = zstd::bulk::compress(&data, level).unwrap();
let binary = zstd::bulk::compress(data, level).unwrap();
Data::Zstd { binary, len }
}
}

View File

@ -0,0 +1,44 @@
[package]
name = "lite-accounts-simulate_from_snapshot"
version = "0.1.0"
edition = "2021"
[dependencies]
lite-account-manager-common = { workspace = true }
lite-account-storage = { workspace = true }
lite-token-account-storage = { workspace = true }
solana-sdk = {workspace = true}
solana-client = {workspace = true}
solana-accounts-db = {workspace = true}
solana-frozen-abi-macro = {workspace = true}
solana-runtime = {workspace = true}
solana-rpc-client-api = { workspace = true }
solana-account-decoder = { workspace = true }
itertools = { workspace = true }
futures = { workspace = true }
lazy_static = {workspace = true}
prometheus = {workspace = true}
log = {workspace = true}
bincode = {workspace = true}
serde = { workspace = true }
anyhow = { workspace = true }
clap = { version = "4.5.4", features = ["derive"] }
memmap2 = "0.5.5"
thiserror = "1.0.31"
tar = "0.4.38"
tokio = { version = "1.38.0", features = ["rt-multi-thread", "macros"]}
zstd = "0.13.2"
async-trait = "0.1.73"
env_logger = "0.9.0"
base64 = "0.21.0"
jsonrpsee = { version = "0.20.0", features = ["macros", "full"] }
tower = "0.4.13"
tower-http = { version = "0.4.0", features = ["full"] }
quic-geyser-client = { git = "https://github.com/blockworks-foundation/quic_geyser_plugin.git", branch = "main" }
quic-geyser-common = { git = "https://github.com/blockworks-foundation/quic_geyser_plugin.git", branch = "main" }

View File

@ -0,0 +1,11 @@
use clap::Parser;
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
pub struct Args {
#[arg(short = 's', long)]
pub snapshot_archive_path: String,
#[arg(short = 'u', long)]
pub quic_url: Option<String>,
}

View File

@ -0,0 +1,168 @@
use std::{fs::File, path::PathBuf, str::FromStr, sync::Arc};
use clap::Parser;
use cli::Args;
use lite_account_manager_common::{
account_data::{Account, AccountData, CompressionMethod, Data},
account_store_interface::AccountStorageInterface,
commitment::Commitment,
slot_info::SlotInfo,
};
use lite_token_account_storage::{
inmemory_token_account_storage::InmemoryTokenAccountStorage,
inmemory_token_storage::TokenProgramAccountsStorage,
};
use quic_geyser_common::{
filters::Filter, message::Message, types::connections_parameters::ConnectionParameters,
};
use snapshot_utils::{append_vec_iter, archived::ArchiveSnapshotExtractor, SnapshotExtractor};
use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey};
use crate::rpc_server::RpcServerImpl;
pub mod cli;
pub mod rpc_server;
pub mod snapshot_utils;
#[tokio::main(worker_threads = 2)]
async fn main() {
env_logger::init_from_env(
env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, "info"),
);
let args = Args::parse();
println!("tester args : {:?}", args);
let Args {
snapshot_archive_path,
quic_url,
} = args;
let token_account_storage = Arc::new(InmemoryTokenAccountStorage::default());
let token_storage: Arc<dyn AccountStorageInterface> =
Arc::new(TokenProgramAccountsStorage::new(token_account_storage));
// fill from quic geyser stream
if let Some(quic_url) = quic_url {
let token_storage = token_storage.clone();
tokio::spawn(async move {
let (quic_client, mut reciever, _jh) =
quic_geyser_client::non_blocking::client::Client::new(
quic_url,
ConnectionParameters::default(),
)
.await
.unwrap();
quic_client
.subscribe(vec![Filter::AccountsAll, Filter::Slot])
.await
.unwrap();
while let Some(message) = reciever.recv().await {
match message {
Message::AccountMsg(account) => {
let compression_method = match account.compression_type {
quic_geyser_common::compression::CompressionType::None => {
CompressionMethod::None
}
quic_geyser_common::compression::CompressionType::Lz4Fast(v) => {
CompressionMethod::Lz4(v)
}
quic_geyser_common::compression::CompressionType::Lz4(v) => {
CompressionMethod::Lz4(v)
}
};
let account_data = AccountData {
pubkey: account.pubkey,
account: Arc::new(Account {
lamports: account.lamports,
data: Data::new(&account.data, compression_method),
owner: account.owner,
executable: account.executable,
rent_epoch: account.rent_epoch,
}),
updated_slot: account.slot_identifier.slot,
write_version: account.write_version,
};
token_storage.update_account(
account_data,
lite_account_manager_common::commitment::Commitment::Processed,
);
}
Message::SlotMsg(slot_msg) => {
if slot_msg.commitment_config == CommitmentConfig::confirmed()
|| slot_msg.commitment_config == CommitmentConfig::finalized()
{
let commitment =
if slot_msg.commitment_config == CommitmentConfig::confirmed() {
Commitment::Confirmed
} else {
Commitment::Finalized
};
token_storage.process_slot_data(
SlotInfo {
slot: slot_msg.slot,
parent: slot_msg.parent,
root: 0,
},
commitment,
);
}
}
_ => {
//not supported
}
}
}
println!("stopping geyser stream");
log::error!("stopping geyser stream");
});
}
// load accounts from snapshot
let bk = {
let token_storage = token_storage.clone();
let token_program =
Pubkey::from_str("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA").unwrap();
tokio::task::spawn_blocking(move || {
let archive_path = PathBuf::from_str(snapshot_archive_path.as_str()).unwrap();
let mut loader: ArchiveSnapshotExtractor<File> =
ArchiveSnapshotExtractor::open(&archive_path).unwrap();
for vec in loader.iter() {
let append_vec = vec.unwrap();
// info!("size: {:?}", append_vec.len());
for handle in append_vec_iter(&append_vec) {
let stored = handle.access().unwrap();
if stored.account_meta.owner != token_program {
continue;
}
let data = stored.data;
let compressed_data = Data::new(data, CompressionMethod::None);
token_storage.initilize_or_update_account(AccountData {
pubkey: stored.meta.pubkey,
account: Arc::new(Account {
lamports: stored.account_meta.lamports,
data: compressed_data,
owner: stored.account_meta.owner,
executable: stored.account_meta.executable,
rent_epoch: stored.account_meta.rent_epoch,
}),
updated_slot: 0,
write_version: 0,
});
}
}
})
};
// await for loading of snapshot to finish
bk.await.unwrap();
log::info!("Storage Initialized with snapshot");
let rpc_server = RpcServerImpl::new(token_storage);
let jh = RpcServerImpl::start_serving(rpc_server, 10700)
.await
.unwrap();
let _ = jh.await;
}

View File

@ -0,0 +1,232 @@
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use base64::Engine;
use itertools::Itertools;
use jsonrpsee::server::ServerBuilder;
use jsonrpsee::{core::RpcResult, proc_macros::rpc};
use lite_account_manager_common::account_filter::AccountFilterType as AmAccountFilterType;
use lite_account_manager_common::account_store_interface::AccountStorageInterface;
use lite_account_manager_common::{account_data::AccountData, commitment::Commitment};
use solana_account_decoder::UiAccount;
use solana_rpc_client_api::client_error::reqwest::Method;
use solana_rpc_client_api::{
config::{RpcAccountInfoConfig, RpcProgramAccountsConfig},
response::{OptionalContext, Response as RpcResponse, RpcKeyedAccount, RpcResponseContext},
};
use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey};
use tokio::task::JoinHandle;
use tower_http::cors::{Any, CorsLayer};
#[rpc(server)]
pub trait TestRpc {
#[method(name = "getProgramAccounts")]
async fn get_program_accounts(
&self,
program_id_str: String,
config: Option<RpcProgramAccountsConfig>,
) -> RpcResult<OptionalContext<Vec<RpcKeyedAccount>>>;
#[method(name = "getSnapshot")]
async fn get_snapshot(&self, program_id_str: String) -> RpcResult<String>;
#[method(name = "getAccountInfo")]
async fn get_account_info(
&self,
pubkey_str: String,
config: Option<RpcAccountInfoConfig>,
) -> RpcResult<RpcResponse<Option<UiAccount>>>;
}
pub struct RpcServerImpl {
storage: Arc<dyn AccountStorageInterface>,
}
impl RpcServerImpl {
pub fn new(storage: Arc<dyn AccountStorageInterface>) -> Self {
Self { storage }
}
pub async fn start_serving(
rpc_impl: RpcServerImpl,
port: u16,
) -> anyhow::Result<JoinHandle<()>> {
let http_addr = format!("[::]:{port}");
let cors = CorsLayer::new()
.max_age(Duration::from_secs(86400))
// Allow `POST` when accessing the resource
.allow_methods([Method::POST, Method::GET, Method::OPTIONS])
// Allow requests from any origin
.allow_origin(Any)
.allow_headers(Any);
let middleware = tower::ServiceBuilder::new().layer(cors);
let http_server_handle = ServerBuilder::default()
.set_middleware(middleware)
.max_connections(10)
.max_request_body_size(1024 * 1024) // 16 MB
.max_response_body_size(512 * 1024 * 1024) // 512 MBs
.http_only()
.build(http_addr.clone())
.await?
.start(rpc_impl.into_rpc());
let jh = tokio::spawn(async move {
log::info!("HTTP Server started at {http_addr:?}");
http_server_handle.stopped().await;
log::error!("QUIC GEYSER PLUGIN HTTP SERVER STOPPED");
});
Ok(jh)
}
}
#[jsonrpsee::core::async_trait]
impl TestRpcServer for RpcServerImpl {
async fn get_program_accounts(
&self,
program_id_str: String,
config: Option<RpcProgramAccountsConfig>,
) -> RpcResult<OptionalContext<Vec<RpcKeyedAccount>>> {
let Ok(program_id) = Pubkey::from_str(&program_id_str) else {
return Err(jsonrpsee::types::error::ErrorCode::InternalError.into());
};
let with_context = config
.as_ref()
.map(|value| value.with_context.unwrap_or_default())
.unwrap_or_default();
let commitment: CommitmentConfig = config
.as_ref()
.and_then(|x| x.account_config.commitment)
.unwrap_or_default();
let account_filters = config
.as_ref()
.map(|x| {
x.filters
.as_ref()
.map(|filters| filters.iter().map(AmAccountFilterType::from).collect_vec())
})
.unwrap_or_default();
let commitment = Commitment::from(commitment);
let gpa = self
.storage
.get_program_accounts(program_id, account_filters, commitment)
.map_err(|_| jsonrpsee::types::error::ErrorCode::InternalError)?;
let min_context_slot = config
.as_ref()
.map(|c| {
if c.with_context.unwrap_or_default() {
c.account_config.min_context_slot
} else {
None
}
})
.unwrap_or_default()
.unwrap_or_default();
let slot = gpa
.iter()
.map(|program_account| program_account.updated_slot)
.max()
.unwrap_or_default();
let acc_config = config.map(|c| c.account_config);
let rpc_keyed_accounts = gpa
.iter()
.filter_map(|account_data| {
if account_data.updated_slot >= min_context_slot {
Some(RpcKeyedAccount {
pubkey: account_data.pubkey.to_string(),
account: convert_account_data_to_ui_account(
account_data,
acc_config.clone(),
),
})
} else {
None
}
})
.collect_vec();
if with_context {
Ok(OptionalContext::Context(RpcResponse {
context: RpcResponseContext {
slot,
api_version: None,
},
value: rpc_keyed_accounts,
}))
} else {
Ok(OptionalContext::NoContext(rpc_keyed_accounts))
}
}
async fn get_snapshot(&self, program_id_str: String) -> RpcResult<String> {
let program_id = Pubkey::from_str(program_id_str.as_str())
.map_err(|_| jsonrpsee::types::error::ErrorCode::InvalidParams)?;
let res = self
.storage
.create_snapshot(program_id)
.map_err(|_| jsonrpsee::types::error::ErrorCode::InternalError)?;
Ok(base64::engine::general_purpose::STANDARD.encode(res))
}
async fn get_account_info(
&self,
pubkey_str: String,
config: Option<RpcAccountInfoConfig>,
) -> RpcResult<RpcResponse<Option<UiAccount>>> {
let account_pk = Pubkey::from_str(pubkey_str.as_str())
.map_err(|_| jsonrpsee::types::error::ErrorCode::InvalidParams)?;
let commitment = config
.clone()
.and_then(|x| x.commitment)
.unwrap_or_default();
let acc = self
.storage
.get_account(account_pk, Commitment::from(commitment))
.map_err(|_| jsonrpsee::types::error::ErrorCode::InternalError)?;
match acc {
Some(acc) => Ok(RpcResponse {
context: RpcResponseContext {
slot: acc.updated_slot,
api_version: None,
},
value: Some(convert_account_data_to_ui_account(&acc, config)),
}),
None => Ok(RpcResponse {
context: RpcResponseContext {
slot: 0,
api_version: None,
},
value: None,
}),
}
}
}
pub fn convert_account_data_to_ui_account(
account_data: &AccountData,
config: Option<RpcAccountInfoConfig>,
) -> UiAccount {
let encoding = config
.as_ref()
.map(|c| c.encoding)
.unwrap_or_default()
.unwrap_or(solana_account_decoder::UiAccountEncoding::Base64);
let data_slice = config.as_ref().map(|c| c.data_slice).unwrap_or_default();
UiAccount::encode(
&account_data.pubkey,
&account_data.account.to_solana_account(),
encoding,
None,
data_slice,
)
}

View File

@ -0,0 +1,229 @@
// Copyright 2022 Solana Foundation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// This file contains code vendored from https://github.com/solana-labs/solana
// Source: solana/runtime/src/append_vec.rs
use {
log::*,
memmap2::{Mmap, MmapMut},
solana_accounts_db::{
account_storage::meta::{AccountMeta, StoredMeta},
accounts_file::ALIGN_BOUNDARY_OFFSET,
append_vec::MAXIMUM_APPEND_VEC_FILE_SIZE,
u64_align,
},
solana_sdk::{
account::{Account, AccountSharedData},
hash::Hash,
},
std::{
convert::TryFrom,
fs::OpenOptions,
io::{self, Read},
mem,
path::Path,
},
};
/// References to account data stored elsewhere. Getting an `Account` requires cloning
/// (see `StoredAccountMeta::clone_account()`).
#[derive(PartialEq, Eq, Debug)]
pub struct StoredAccountMeta<'a> {
pub meta: &'a StoredMeta,
/// account data
pub account_meta: &'a AccountMeta,
pub data: &'a [u8],
pub offset: usize,
pub stored_size: usize,
pub hash: &'a Hash,
}
impl<'a> StoredAccountMeta<'a> {
/// Return a new Account by copying all the data referenced by the `StoredAccountMeta`.
pub fn clone_account(&self) -> AccountSharedData {
AccountSharedData::from(Account {
lamports: self.account_meta.lamports,
owner: self.account_meta.owner,
executable: self.account_meta.executable,
rent_epoch: self.account_meta.rent_epoch,
data: self.data.to_vec(),
})
}
}
/// A thread-safe, file-backed block of memory used to store `Account` instances. Append operations
/// are serialized such that only one thread updates the internal `append_lock` at a time. No
/// restrictions are placed on reading. That is, one may read items from one thread while another
/// is appending new items.
pub struct AppendVec {
/// A file-backed block of memory that is used to store the data for each appended item.
map: Mmap,
/// The number of bytes used to store items, not the number of items.
current_len: usize,
/// The number of bytes available for storing items.
file_size: u64,
slot: u64,
}
impl AppendVec {
fn sanitize_len_and_size(current_len: usize, file_size: usize) -> io::Result<()> {
if file_size == 0 {
Err(std::io::Error::new(
std::io::ErrorKind::Other,
format!("too small file size {} for AppendVec", file_size),
))
} else if usize::try_from(MAXIMUM_APPEND_VEC_FILE_SIZE)
.map(|max| file_size > max)
.unwrap_or(true)
{
Err(std::io::Error::new(
std::io::ErrorKind::Other,
format!("too large file size {} for AppendVec", file_size),
))
} else if current_len > file_size {
Err(std::io::Error::new(
std::io::ErrorKind::Other,
format!("current_len is larger than file size ({})", file_size),
))
} else {
Ok(())
}
}
/// how many more bytes can be stored in this append vec
pub const fn remaining_bytes(&self) -> u64 {
(self.capacity()).saturating_sub(self.len() as u64)
}
pub const fn len(&self) -> usize {
self.current_len
}
pub const fn is_empty(&self) -> bool {
self.len() == 0
}
pub const fn capacity(&self) -> u64 {
self.file_size
}
pub fn new_from_file<P: AsRef<Path>>(
path: P,
current_len: usize,
slot: u64,
) -> io::Result<Self> {
let data = OpenOptions::new()
.read(true)
.write(false)
.create(false)
.open(&path)?;
let file_size = std::fs::metadata(&path)?.len();
AppendVec::sanitize_len_and_size(current_len, file_size as usize)?;
let map = unsafe {
let result = Mmap::map(&data);
if result.is_err() {
// for vm.max_map_count, error is: {code: 12, kind: Other, message: "Cannot allocate memory"}
info!("memory map error: {:?}. This may be because vm.max_map_count is not set correctly.", result);
}
result?
};
let new = AppendVec {
map,
current_len,
file_size,
slot,
};
Ok(new)
}
pub fn new_from_reader<R: Read>(
reader: &mut R,
current_len: usize,
slot: u64,
) -> io::Result<Self> {
let mut map = MmapMut::map_anon(current_len)?;
io::copy(&mut reader.take(current_len as u64), &mut map.as_mut())?;
Ok(AppendVec {
map: map.make_read_only()?,
current_len,
file_size: current_len as u64,
slot,
})
}
/// Get a reference to the data at `offset` of `size` bytes if that slice
/// doesn't overrun the internal buffer. Otherwise return None.
/// Also return the offset of the first byte after the requested data that
/// falls on a 64-byte boundary.
fn get_slice(&self, offset: usize, size: usize) -> Option<(&[u8], usize)> {
let (next, overflow) = offset.overflowing_add(size);
if overflow || next > self.len() {
return None;
}
let data = &self.map[offset..next];
let next = u64_align!(next);
Some((
//UNSAFE: This unsafe creates a slice that represents a chunk of self.map memory
//The lifetime of this slice is tied to &self, since it points to self.map memory
unsafe { std::slice::from_raw_parts(data.as_ptr(), size) },
next,
))
}
/// Return a reference to the type at `offset` if its data doesn't overrun the internal buffer.
/// Otherwise return None. Also return the offset of the first byte after the requested data
/// that falls on a 64-byte boundary.
fn get_type<'a, T>(&self, offset: usize) -> Option<(&'a T, usize)> {
let (data, next) = self.get_slice(offset, mem::size_of::<T>())?;
let ptr: *const T = data.as_ptr() as *const T;
//UNSAFE: The cast is safe because the slice is aligned and fits into the memory
//and the lifetime of the &T is tied to self, which holds the underlying memory map
Some((unsafe { &*ptr }, next))
}
/// Return account metadata for the account at `offset` if its data doesn't overrun
/// the internal buffer. Otherwise return None. Also return the offset of the first byte
/// after the requested data that falls on a 64-byte boundary.
pub fn get_account<'a>(&'a self, offset: usize) -> Option<(StoredAccountMeta<'a>, usize)> {
let (meta, next): (&'a StoredMeta, _) = self.get_type(offset)?;
let (account_meta, next): (&'a AccountMeta, _) = self.get_type(next)?;
let (hash, next): (&'a Hash, _) = self.get_type(next)?;
let (data, next) = self.get_slice(next, meta.data_len as usize)?;
let stored_size = next - offset;
Some((
StoredAccountMeta {
meta,
account_meta,
data,
offset,
stored_size,
hash,
},
next,
))
}
pub const fn slot(&self) -> u64 {
self.slot
}
}

View File

@ -0,0 +1,179 @@
use {
crate::snapshot_utils::{
deserialize_from, parse_append_vec_name, AccountsDbFields, AppendVec, AppendVecIterator,
DeserializableVersionedBank, SerializableAccountStorageEntry, SnapshotError,
SnapshotExtractor, SnapshotResult,
},
log::info,
std::{
fs::File,
io::{BufReader, Read},
path::{Component, Path},
pin::Pin,
time::Instant,
},
tar::{Archive, Entries, Entry},
};
/// Extracts account data from a .tar.zst stream.
pub struct ArchiveSnapshotExtractor<Source>
where
Source: Read + Unpin + 'static,
{
accounts_db_fields: AccountsDbFields<SerializableAccountStorageEntry>,
_archive: Pin<Box<Archive<zstd::Decoder<'static, BufReader<Source>>>>>,
entries: Option<Entries<'static, zstd::Decoder<'static, BufReader<Source>>>>,
}
impl<Source> SnapshotExtractor for ArchiveSnapshotExtractor<Source>
where
Source: Read + Unpin + 'static,
{
fn iter(&mut self) -> AppendVecIterator<'_> {
Box::new(self.unboxed_iter())
}
}
impl<Source> ArchiveSnapshotExtractor<Source>
where
Source: Read + Unpin + 'static,
{
pub fn from_reader(source: Source) -> SnapshotResult<Self> {
let tar_stream = zstd::stream::read::Decoder::new(source)?;
let mut archive = Box::pin(Archive::new(tar_stream));
// This is safe as long as we guarantee that entries never gets accessed past drop.
let archive_static = unsafe { &mut *((&mut *archive) as *mut Archive<_>) };
let mut entries = archive_static.entries()?;
// Search for snapshot manifest.
let mut snapshot_file: Option<Entry<_>> = None;
for entry in entries.by_ref() {
let entry = entry?;
let path = entry.path()?;
if Self::is_snapshot_manifest_file(&path) {
snapshot_file = Some(entry);
break;
} else if Self::is_appendvec_file(&path) {
// TODO Support archives where AppendVecs precede snapshot manifests
return Err(SnapshotError::UnexpectedAppendVec);
}
}
let snapshot_file = snapshot_file.ok_or(SnapshotError::NoSnapshotManifest)?;
//let snapshot_file_len = snapshot_file.size();
let snapshot_file_path = snapshot_file.path()?.as_ref().to_path_buf();
info!("Opening snapshot manifest: {:?}", &snapshot_file_path);
let mut snapshot_file = BufReader::new(snapshot_file);
let pre_unpack = Instant::now();
let versioned_bank: DeserializableVersionedBank = deserialize_from(&mut snapshot_file)?;
drop(versioned_bank);
let versioned_bank_post_time = Instant::now();
let accounts_db_fields: AccountsDbFields<SerializableAccountStorageEntry> =
deserialize_from(&mut snapshot_file)?;
let accounts_db_fields_post_time = Instant::now();
drop(snapshot_file);
info!(
"Read bank fields in {:?}",
versioned_bank_post_time - pre_unpack
);
info!(
"Read accounts DB fields in {:?}",
accounts_db_fields_post_time - versioned_bank_post_time
);
Ok(ArchiveSnapshotExtractor {
_archive: archive,
accounts_db_fields,
entries: Some(entries),
})
}
fn unboxed_iter(&mut self) -> impl Iterator<Item = SnapshotResult<AppendVec>> + '_ {
self.entries
.take()
.into_iter()
.flatten()
.filter_map(|entry| {
let mut entry = match entry {
Ok(x) => x,
Err(e) => return Some(Err(e.into())),
};
let path = match entry.path() {
Ok(x) => x,
Err(e) => return Some(Err(e.into())),
};
let (slot, id) = path.file_name().and_then(parse_append_vec_name)?;
Some(self.process_entry(&mut entry, slot, id))
})
}
fn process_entry(
&self,
entry: &mut Entry<'static, zstd::Decoder<'static, BufReader<Source>>>,
slot: u64,
id: u64,
) -> SnapshotResult<AppendVec> {
let known_vecs = self
.accounts_db_fields
.0
.get(&slot)
.map(|v| &v[..])
.unwrap_or(&[]);
let known_vec = known_vecs.iter().find(|entry| entry.id == (id as usize));
let known_vec = match known_vec {
None => return Err(SnapshotError::UnexpectedAppendVec),
Some(v) => v,
};
Ok(AppendVec::new_from_reader(
entry,
known_vec.accounts_current_len,
slot,
)?)
}
fn is_snapshot_manifest_file(path: &Path) -> bool {
let mut components = path.components();
if components.next() != Some(Component::Normal("snapshots".as_ref())) {
return false;
}
let slot_number_str_1 = match components.next() {
Some(Component::Normal(slot)) => slot,
_ => return false,
};
// Check if slot number file is valid u64.
if slot_number_str_1
.to_str()
.and_then(|s| s.parse::<u64>().ok())
.is_none()
{
return false;
}
let slot_number_str_2 = match components.next() {
Some(Component::Normal(slot)) => slot,
_ => return false,
};
components.next().is_none() && slot_number_str_1 == slot_number_str_2
}
fn is_appendvec_file(path: &Path) -> bool {
let mut components = path.components();
if components.next() != Some(Component::Normal("accounts".as_ref())) {
return false;
}
let name = match components.next() {
Some(Component::Normal(c)) => c,
_ => return false,
};
components.next().is_none() && parse_append_vec_name(name).is_some()
}
}
impl ArchiveSnapshotExtractor<File> {
pub fn open(path: &Path) -> SnapshotResult<Self> {
Self::from_reader(File::open(path)?)
}
}

View File

@ -0,0 +1,104 @@
use {
crate::{
snapshot_utils::append_vec::{AppendVec, StoredAccountMeta},
snapshot_utils::solana::{
deserialize_from, AccountsDbFields, DeserializableVersionedBank,
SerializableAccountStorageEntry,
},
},
std::{ffi::OsStr, io::Read, path::Path, str::FromStr},
thiserror::Error,
};
pub mod append_vec;
pub mod archived;
pub mod parallel;
pub mod solana;
pub mod unpacked;
const SNAPSHOTS_DIR: &str = "snapshots";
#[derive(Error, Debug)]
pub enum SnapshotError {
#[error("{0}")]
IOError(#[from] std::io::Error),
#[error("Failed to deserialize: {0}")]
BincodeError(#[from] bincode::Error),
#[error("Missing status cache")]
NoStatusCache,
#[error("No snapshot manifest file found")]
NoSnapshotManifest,
#[error("Unexpected AppendVec")]
UnexpectedAppendVec,
#[error("Failed to create read progress tracking: {0}")]
ReadProgressTracking(String),
}
pub type SnapshotResult<T> = Result<T, SnapshotError>;
pub type AppendVecIterator<'a> = Box<dyn Iterator<Item = SnapshotResult<AppendVec>> + 'a>;
pub trait SnapshotExtractor: Sized {
fn iter(&mut self) -> AppendVecIterator<'_>;
}
fn parse_append_vec_name(name: &OsStr) -> Option<(u64, u64)> {
let name = name.to_str()?;
let mut parts = name.splitn(2, '.');
let slot = u64::from_str(parts.next().unwrap_or(""));
let id = u64::from_str(parts.next().unwrap_or(""));
match (slot, id) {
(Ok(slot), Ok(version)) => Some((slot, version)),
_ => None,
}
}
pub fn append_vec_iter(append_vec: &AppendVec) -> impl Iterator<Item = StoredAccountMetaHandle> {
let mut offset = 0usize;
std::iter::repeat_with(move || {
append_vec.get_account(offset).map(|(_, next_offset)| {
let account = StoredAccountMetaHandle::new(append_vec, offset);
offset = next_offset;
account
})
})
.take_while(|account| account.is_some())
.flatten()
}
pub struct StoredAccountMetaHandle<'a> {
append_vec: &'a AppendVec,
offset: usize,
}
impl<'a> StoredAccountMetaHandle<'a> {
pub const fn new(append_vec: &'a AppendVec, offset: usize) -> StoredAccountMetaHandle {
Self { append_vec, offset }
}
pub fn access(&self) -> Option<StoredAccountMeta<'_>> {
Some(self.append_vec.get_account(self.offset)?.0)
}
}
pub trait ReadProgressTracking {
fn new_read_progress_tracker(
&self,
path: &Path,
rd: Box<dyn Read>,
file_len: u64,
) -> SnapshotResult<Box<dyn Read>>;
}
struct NoopReadProgressTracking {}
impl ReadProgressTracking for NoopReadProgressTracking {
fn new_read_progress_tracker(
&self,
_path: &Path,
rd: Box<dyn Read>,
_file_len: u64,
) -> SnapshotResult<Box<dyn Read>> {
Ok(rd)
}
}

View File

@ -0,0 +1,38 @@
use {
crate::snapshot_utils::{AppendVec, AppendVecIterator},
tokio::task::JoinSet,
};
#[async_trait::async_trait]
pub trait AppendVecConsumer {
async fn on_append_vec(&mut self, append_vec: AppendVec) -> anyhow::Result<()>;
}
pub async fn par_iter_append_vecs<F, A>(
iterator: AppendVecIterator<'_>,
create_consumer: F,
num_threads: usize,
) -> anyhow::Result<()>
where
F: Fn() -> A,
A: AppendVecConsumer + Send + 'static,
{
let mut tasks = JoinSet::new();
for append_vec in iterator {
let mut consumer = if tasks.len() >= num_threads {
tasks.join_next().await.expect("checked")??
} else {
create_consumer()
};
tasks.spawn(async move {
consumer.on_append_vec(append_vec?).await?;
Ok::<_, anyhow::Error>(consumer)
});
}
while let Some(result) = tasks.join_next().await {
result??;
}
Ok(())
}

View File

@ -0,0 +1,127 @@
// Copyright 2022 Solana Foundation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// This file contains code vendored from https://github.com/solana-labs/solana
use bincode::Options;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use solana_accounts_db::account_storage::meta::StoredMetaWriteVersion;
use solana_accounts_db::accounts_db::BankHashStats;
use solana_accounts_db::ancestors::AncestorsForSerialization;
use solana_accounts_db::blockhash_queue::BlockhashQueue;
use solana_accounts_db::rent_collector::RentCollector;
use solana_frozen_abi_macro::AbiExample;
use solana_runtime::epoch_stakes::EpochStakes;
use solana_runtime::stakes::Stakes;
use solana_sdk::clock::{Epoch, UnixTimestamp};
use solana_sdk::deserialize_utils::default_on_eof;
use solana_sdk::epoch_schedule::EpochSchedule;
use solana_sdk::fee_calculator::{FeeCalculator, FeeRateGovernor};
use solana_sdk::hard_forks::HardForks;
use solana_sdk::hash::Hash;
use solana_sdk::inflation::Inflation;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::slot_history::Slot;
use solana_sdk::stake::state::Delegation;
use std::collections::{HashMap, HashSet};
use std::io::Read;
const MAX_STREAM_SIZE: u64 = 32 * 1024 * 1024 * 1024;
pub fn deserialize_from<R, T>(reader: R) -> bincode::Result<T>
where
R: Read,
T: DeserializeOwned,
{
bincode::options()
.with_limit(MAX_STREAM_SIZE)
.with_fixint_encoding()
.allow_trailing_bytes()
.deserialize_from::<R, T>(reader)
}
#[derive(Default, PartialEq, Eq, Debug, Deserialize)]
struct UnusedAccounts {
unused1: HashSet<Pubkey>,
unused2: HashSet<Pubkey>,
unused3: HashMap<Pubkey, u64>,
}
#[derive(Deserialize)]
#[allow(dead_code)]
pub struct DeserializableVersionedBank {
pub blockhash_queue: BlockhashQueue,
pub ancestors: AncestorsForSerialization,
pub hash: Hash,
pub parent_hash: Hash,
pub parent_slot: Slot,
pub hard_forks: HardForks,
pub transaction_count: u64,
pub tick_height: u64,
pub signature_count: u64,
pub capitalization: u64,
pub max_tick_height: u64,
pub hashes_per_tick: Option<u64>,
pub ticks_per_slot: u64,
pub ns_per_slot: u128,
pub genesis_creation_time: UnixTimestamp,
pub slots_per_year: f64,
pub accounts_data_len: u64,
pub slot: Slot,
pub epoch: Epoch,
pub block_height: u64,
pub collector_id: Pubkey,
pub collector_fees: u64,
pub fee_calculator: FeeCalculator,
pub fee_rate_governor: FeeRateGovernor,
pub collected_rent: u64,
pub rent_collector: RentCollector,
pub epoch_schedule: EpochSchedule,
pub inflation: Inflation,
pub stakes: Stakes<Delegation>,
#[allow(dead_code)]
unused_accounts: UnusedAccounts,
pub epoch_stakes: HashMap<Epoch, EpochStakes>,
pub is_delta: bool,
}
#[derive(Clone, Default, Debug, Serialize, Deserialize, PartialEq, Eq, AbiExample)]
pub struct BankHashInfo {
pub hash: Hash,
pub snapshot_hash: Hash,
pub stats: BankHashStats,
}
#[derive(Clone, Debug, Default, Deserialize, PartialEq)]
pub struct AccountsDbFields<T>(
pub HashMap<Slot, Vec<T>>,
pub StoredMetaWriteVersion,
pub Slot,
pub BankHashInfo,
/// all slots that were roots within the last epoch
#[serde(deserialize_with = "default_on_eof")]
pub Vec<Slot>,
/// slots that were roots within the last epoch for which we care about the hash value
#[serde(deserialize_with = "default_on_eof")]
pub Vec<(Slot, Hash)>,
);
pub type SerializedAppendVecId = usize;
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Deserialize)]
pub struct SerializableAccountStorageEntry {
pub id: SerializedAppendVecId,
pub accounts_current_len: usize,
}

View File

@ -0,0 +1,125 @@
use {
crate::snapshot_utils::{
deserialize_from, parse_append_vec_name, AccountsDbFields, AppendVec, AppendVecIterator,
DeserializableVersionedBank, ReadProgressTracking, SerializableAccountStorageEntry,
SnapshotError, SnapshotExtractor, SnapshotResult, SNAPSHOTS_DIR,
},
itertools::Itertools,
log::info,
solana_runtime::snapshot_utils::SNAPSHOT_STATUS_CACHE_FILENAME,
std::{
fs::OpenOptions,
io::BufReader,
path::{Path, PathBuf},
str::FromStr,
time::Instant,
},
};
/// Extracts account data from snapshots that were unarchived to a file system.
pub struct UnpackedSnapshotExtractor {
root: PathBuf,
accounts_db_fields: AccountsDbFields<SerializableAccountStorageEntry>,
}
impl SnapshotExtractor for UnpackedSnapshotExtractor {
fn iter(&mut self) -> AppendVecIterator<'_> {
Box::new(self.unboxed_iter())
}
}
impl UnpackedSnapshotExtractor {
pub fn open(
path: &Path,
progress_tracking: Box<dyn ReadProgressTracking>,
) -> SnapshotResult<Self> {
let snapshots_dir = path.join(SNAPSHOTS_DIR);
let status_cache = snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
if !status_cache.is_file() {
return Err(SnapshotError::NoStatusCache);
}
let snapshot_files = snapshots_dir.read_dir()?;
let snapshot_file_path = snapshot_files
.filter_map(|entry| entry.ok())
.find(|entry| u64::from_str(&entry.file_name().to_string_lossy()).is_ok())
.map(|entry| entry.path().join(entry.file_name()))
.ok_or(SnapshotError::NoSnapshotManifest)?;
info!("Opening snapshot manifest: {:?}", snapshot_file_path);
let snapshot_file = OpenOptions::new().read(true).open(&snapshot_file_path)?;
let snapshot_file_len = snapshot_file.metadata()?.len();
let snapshot_file = progress_tracking.new_read_progress_tracker(
&snapshot_file_path,
Box::new(snapshot_file),
snapshot_file_len,
)?;
let mut snapshot_file = BufReader::new(snapshot_file);
let pre_unpack = Instant::now();
let versioned_bank: DeserializableVersionedBank = deserialize_from(&mut snapshot_file)?;
drop(versioned_bank);
let versioned_bank_post_time = Instant::now();
let accounts_db_fields: AccountsDbFields<SerializableAccountStorageEntry> =
deserialize_from(&mut snapshot_file)?;
let accounts_db_fields_post_time = Instant::now();
drop(snapshot_file);
info!(
"Read bank fields in {:?}",
versioned_bank_post_time - pre_unpack
);
info!(
"Read accounts DB fields in {:?}",
accounts_db_fields_post_time - versioned_bank_post_time
);
Ok(UnpackedSnapshotExtractor {
root: path.to_path_buf(),
accounts_db_fields,
})
}
pub fn unboxed_iter(&self) -> impl Iterator<Item = SnapshotResult<AppendVec>> + '_ {
std::iter::once(self.iter_streams())
.flatten_ok()
.flatten_ok()
}
fn iter_streams(&self) -> SnapshotResult<impl Iterator<Item = SnapshotResult<AppendVec>> + '_> {
let accounts_dir = self.root.join("accounts");
Ok(accounts_dir
.read_dir()?
.filter_map(|f| f.ok())
.filter_map(|f| {
let name = f.file_name();
parse_append_vec_name(&f.file_name()).map(move |parsed| (parsed, name))
})
.map(move |((slot, version), name)| {
self.open_append_vec(slot, version, &accounts_dir.join(name))
}))
}
fn open_append_vec(&self, slot: u64, id: u64, path: &Path) -> SnapshotResult<AppendVec> {
let known_vecs = self
.accounts_db_fields
.0
.get(&slot)
.map(|v| &v[..])
.unwrap_or(&[]);
let known_vec = known_vecs.iter().find(|entry| entry.id == (id as usize));
let known_vec = match known_vec {
None => return Err(SnapshotError::UnexpectedAppendVec),
Some(v) => v,
};
Ok(AppendVec::new_from_file(
path,
known_vec.accounts_current_len,
slot,
)?)
}
}

View File

@ -602,7 +602,11 @@ impl AccountStorageInterface for TokenProgramAccountsStorage {
) {
Ok(res) => res,
Err(e) => {
log::error!("Token program account {} was not able to identified {}", account_data.pubkey.to_string(), e);
log::error!(
"Token program account {} was not able to identified {}",
account_data.pubkey.to_string(),
e
);
return;
}
};
@ -624,7 +628,7 @@ impl AccountStorageInterface for TokenProgramAccountsStorage {
commitment,
self.confirmed_slot.load(Ordering::Relaxed),
)
.get(0)
.first()
.unwrap()
{
Some((processed_account, slot)) => Ok(token_program_account_to_solana_account(

View File

@ -94,7 +94,9 @@ pub fn get_token_program_account_type(
match type_account {
0 => {
//mint
let mint = spl_token_2022::state::Mint::unpack_unchecked(&account_data.account.data.data())?;
let mint = spl_token_2022::state::Mint::unpack_unchecked(
&account_data.account.data.data(),
)?;
Ok(TokenProgramAccountType::Mint(MintAccount {
program: crate::account_types::Program::Token2022Program,
pubkey: account_data.pubkey,
@ -108,8 +110,9 @@ pub fn get_token_program_account_type(
}))
}
1 => {
let token_account =
spl_token_2022::state::Account::unpack_unchecked(&account_data.account.data.data())?;
let token_account = spl_token_2022::state::Account::unpack_unchecked(
&account_data.account.data.data(),
)?;
let mint_index = get_or_create_mint_index(
token_account.mint,
mint_index_by_pubkey,
@ -143,8 +146,9 @@ pub fn get_token_program_account_type(
}))
}
2 => {
let multi_sig =
spl_token_2022::state::Multisig::unpack_unchecked(&account_data.account.data.data())?;
let multi_sig = spl_token_2022::state::Multisig::unpack_unchecked(
&account_data.account.data.data(),
)?;
Ok(TokenProgramAccountType::MultiSig(
MultiSig {
program: crate::account_types::Program::Token2022Program,
@ -215,7 +219,8 @@ pub fn get_token_program_account_type(
}))
} else {
// multisig
let multi_sig = spl_token::state::Multisig::unpack_unchecked(&account_data.account.data.data())?;
let multi_sig =
spl_token::state::Multisig::unpack_unchecked(&account_data.account.data.data())?;
Ok(TokenProgramAccountType::MultiSig(
MultiSig {
program: crate::account_types::Program::TokenProgram,