Simulate from snapshot uses accountsdb

This commit is contained in:
mango-dee 2024-12-25 08:02:34 +08:00
parent 05610938a8
commit 6493d627dd
19 changed files with 280 additions and 974 deletions

1
.gitignore vendored
View File

@ -1,2 +1,3 @@
/target
.idea
.run

6
Cargo.lock generated
View File

@ -2657,7 +2657,7 @@ dependencies = [
]
[[package]]
name = "lite-accounts-from-storage"
name = "lite-accounts-from-snapshot"
version = "0.1.0"
dependencies = [
"anyhow",
@ -2716,11 +2716,13 @@ dependencies = [
"clap 4.5.20",
"env_logger",
"futures",
"geyser-grpc-connector",
"itertools 0.10.5",
"jsonrpsee",
"lazy_static",
"lite-account-manager-common",
"lite-account-storage",
"lite-accounts-from-snapshot",
"lite-token-account-storage",
"log",
"memmap2",
@ -2740,6 +2742,8 @@ dependencies = [
"tokio",
"tower 0.4.13",
"tower-http",
"tracing-subscriber",
"yellowstone-grpc-proto",
"zstd 0.13.2",
]

View File

@ -6,4 +6,7 @@ to confirmed and finalized as their slots are finalized.
This project also enables to create a snapshot of all the account states at the moment.
This project is used in lite-rpc and quic geyser plugin.
This project is used in lite-rpc and quic geyser plugin.
solana account oQPnhXAbLbMuKHESaGrbXT17CyvWCpLyERSJA9HCYd7 -u http://localhost:10700
solana account oQPnhXAbLbMuKHESaGrbXT17CyvWCpLyERSJA9HCYd7 -u http://139.178.82.223:10700

View File

@ -2,9 +2,13 @@ use std::{
collections::{BTreeSet, HashMap},
sync::{Arc, Mutex, RwLockReadGuard},
};
use std::collections::BTreeMap;
use std::sync::RwLock;
use crate::account_data_by_commitment::AccountDataByCommitment;
use dashmap::DashMap;
use prometheus::{IntGauge, opts, register_int_gauge};
use solana_sdk::{pubkey::Pubkey, slot_history::Slot};
use lite_account_manager_common::{
account_data::AccountData,
account_filter::{AccountFilter, AccountFilterType},
@ -13,10 +17,8 @@ use lite_account_manager_common::{
commitment::Commitment,
slot_info::SlotInfo,
};
use prometheus::{opts, register_int_gauge, IntGauge};
use solana_sdk::{pubkey::Pubkey, slot_history::Slot};
use std::collections::BTreeMap;
use std::sync::RwLock;
use crate::account_data_by_commitment::AccountDataByCommitment;
lazy_static::lazy_static! {
static ref ACCOUNT_STORED_IN_MEMORY: IntGauge =
@ -36,6 +38,7 @@ struct SlotStatus {
}
type AccountIndex = usize;
pub struct InmemoryAccountStore {
pubkey_to_account_index: Arc<DashMap<Pubkey, AccountIndex>>,
accounts_by_owner: Arc<DashMap<Pubkey, Arc<RwLock<BTreeSet<AccountIndex>>>>>,
@ -264,6 +267,7 @@ impl AccountStorageInterface for InmemoryAccountStore {
}
fn initilize_or_update_account(&self, account_data: AccountData) {
println!("{}", account_data.pubkey);
self.update_account(account_data, Commitment::Finalized);
}
@ -437,11 +441,11 @@ impl AccountStorageInterface for InmemoryAccountStore {
if (prev_account_data.account.owner != account_data.account.owner
|| account_data.account.lamports == 0)
&& self.update_owner_delete_if_necessary(
&prev_account_data,
&account_data,
writable_account_index,
commitment,
)
&prev_account_data,
&account_data,
writable_account_index,
commitment,
)
{
self.pubkey_to_account_index.remove(&account_data.pubkey);
writable_lk.delete();
@ -477,13 +481,15 @@ impl AccountStorageInterface for InmemoryAccountStore {
mod tests {
use std::{collections::HashSet, sync::Arc};
use crate::inmemory_account_store::InmemoryAccountStore;
use base64::Engine;
use itertools::Itertools;
use rand::{Rng, rngs::ThreadRng};
use solana_sdk::{account::Account as SolanaAccount, pubkey::Pubkey, slot_history::Slot};
use lite_account_manager_common::{
account_data::{Account, AccountData, CompressionMethod},
account_filter::{
AccountFilter, AccountFilterType, AccountFilters, MemcmpFilter, MemcmpFilterData,
AccountFilter, AccountFilters, AccountFilterType, MemcmpFilter, MemcmpFilterData,
},
account_filters_interface::AccountFiltersStoreInterface,
account_store_interface::{AccountLoadingError, AccountStorageInterface},
@ -491,8 +497,8 @@ mod tests {
simple_filter_store::SimpleFilterStore,
slot_info::SlotInfo,
};
use rand::{rngs::ThreadRng, Rng};
use solana_sdk::{account::Account as SolanaAccount, pubkey::Pubkey, slot_history::Slot};
use crate::inmemory_account_store::InmemoryAccountStore;
fn create_random_account(
rng: &mut ThreadRng,
@ -1927,13 +1933,13 @@ mod tests {
store
.get_program_accounts(program_1, None, Commitment::Confirmed)
.unwrap(),
vec![account_3_slot5.clone(),]
vec![account_3_slot5.clone()]
);
assert_eq!(
store
.get_program_accounts(program_1, None, Commitment::Finalized)
.unwrap(),
vec![account_3_slot5.clone(),]
vec![account_3_slot5.clone()]
);
assert_eq!(
store
@ -2011,7 +2017,7 @@ mod tests {
store
.get_program_accounts(program_1, None, Commitment::Finalized)
.unwrap(),
vec![account_3_slot5.clone(),]
vec![account_3_slot5.clone()]
);
assert_eq!(
store

View File

@ -1,5 +1,5 @@
[package]
name = "lite-accounts-from-storage"
name = "lite-accounts-from-snapshot"
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"

View File

@ -5,7 +5,7 @@ use std::str::FromStr;
use solana_runtime::snapshot_archive_info::SnapshotArchiveInfoGetter;
use solana_sdk::epoch_schedule::Slot;
use lite_accounts_from_storage::{Config, HostUrl, Loader};
use lite_accounts_from_snapshot::{Config, HostUrl, Loader};
pub struct TestConsumer {}

View File

@ -27,7 +27,7 @@ use yellowstone_grpc_proto::geyser::{
use lite_account_manager_common::account_data::{Account, AccountData, Data};
use lite_account_manager_common::account_store_interface::AccountStorageInterface;
use lite_account_storage::accountsdb::AccountsDb;
use lite_accounts_from_storage::{import, Config, HostUrl, Loader};
use lite_accounts_from_snapshot::{import, Config, HostUrl, Loader};
type AtomicSlot = Arc<AtomicU64>;
@ -127,7 +127,7 @@ fn account_stream(mut geyser_messages_rx: Receiver<Message>) -> Receiver<Account
_ => {}
},
None => {
log::warn!("multiplexer channel closed - aborting");
warn!("multiplexer channel closed - aborting");
return;
}
Some(Message::Connecting(_)) => {}

View File

@ -46,9 +46,9 @@ pub async fn latest_full_snapshot(
let full_slot = parts[0].parse::<u64>().unwrap();
debug!("{} has snapshot of {}", &host, full_slot);
if full_slot < not_before_slot {
continue;
}
// if full_slot < not_before_slot {
// continue;
// }
let hash = SnapshotHash(Hash::from_str(parts[1]).unwrap());
snapshots.push(LatestFullSnapshot {
@ -85,9 +85,9 @@ pub async fn latest_incremental_snapshot(
let incremental_slot = parts[1].parse::<u64>().unwrap();
debug!("{} has incremental snapshot of {}", &host, incremental_slot);
if incremental_slot < not_before_incremental_slot {
continue;
}
// if incremental_slot < not_before_incremental_slot {
// continue;
// }
let hash = SnapshotHash(Hash::from_str(parts[2]).unwrap());
snapshots.push(LatestIncrementalSnapshot {

View File

@ -3,7 +3,7 @@ use std::fmt::Display;
use serde::Serializer;
use solana_sdk::commitment_config::CommitmentConfig;
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug)]
#[repr(u8)]
pub enum Commitment {
Processed = 0,
@ -23,6 +23,17 @@ impl From<CommitmentConfig> for Commitment {
}
}
impl From<i32> for Commitment {
fn from(value: i32) -> Self {
match value {
0 => Commitment::Processed,
1 => Commitment::Confirmed,
2 => Commitment::Finalized,
_ => unreachable!()
}
}
}
impl Display for Commitment {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {

View File

@ -1,8 +1,17 @@
use solana_sdk::clock::Slot;
use crate::commitment::Commitment;
#[derive(PartialEq, Eq, Debug, Clone, Copy)]
pub struct SlotInfo {
pub slot: Slot,
pub parent: Slot,
pub root: Slot,
}
#[derive(PartialEq, Eq, Debug, Clone, Copy)]
pub struct SlotInfoWithCommitment {
pub info: SlotInfo,
pub commitment: Commitment,
}

View File

@ -7,6 +7,7 @@ edition = "2021"
lite-account-manager-common = { workspace = true }
lite-account-storage = { workspace = true }
lite-token-account-storage = { workspace = true }
lite-accounts-from-snapshot = { workspace = true}
solana-sdk = {workspace = true}
solana-client = {workspace = true}
@ -39,6 +40,9 @@ jsonrpsee = { version = "0.20.0", features = ["macros", "full"] }
tower = "0.4.13"
tower-http = { version = "0.4.0", features = ["full"] }
yellowstone-grpc-proto = { workspace = true }
geyser-grpc-connector = { workspace = true }
quic-geyser-client = { git = "https://github.com/blockworks-foundation/quic_geyser_plugin.git", tag = "v0.1.6_solana+2.0.15" }
quic-geyser-common = { git = "https://github.com/blockworks-foundation/quic_geyser_plugin.git", tag = "v0.1.6_solana+2.0.15" }
quic-geyser-common = { git = "https://github.com/blockworks-foundation/quic_geyser_plugin.git", tag = "v0.1.6_solana+2.0.15" }
tracing-subscriber = "0.3.19"

View File

@ -1,6 +1,19 @@
use std::{fs::File, path::PathBuf, str::FromStr, sync::Arc};
use std::{env, fs::File, path::PathBuf, str::FromStr, sync::Arc};
use std::sync::Once;
use std::time::Duration;
use clap::Parser;
use geyser_grpc_connector::{GrpcConnectionTimeouts, GrpcSourceConfig};
use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::create_geyser_autoconnection_task_with_mpsc;
use log::{debug, info};
use quic_geyser_common::{
filters::Filter, message::Message, types::connections_parameters::ConnectionParameters,
};
use solana_accounts_db::accounts_index::IndexLimitMb::InMemOnly;
use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey};
use tokio::sync::mpsc::Receiver;
use tokio::task::JoinHandle;
use cli::Args;
use lite_account_manager_common::{
account_data::{Account, AccountData, CompressionMethod, Data},
@ -8,161 +21,96 @@ use lite_account_manager_common::{
commitment::Commitment,
slot_info::SlotInfo,
};
use lite_account_manager_common::account_filter::AccountFilter;
use lite_account_manager_common::account_filters_interface::AccountFiltersStoreInterface;
use lite_account_manager_common::simple_filter_store::SimpleFilterStore;
use lite_account_manager_common::slot_info::SlotInfoWithCommitment;
use lite_account_storage::accountsdb::AccountsDb;
use lite_account_storage::inmemory_account_store::InmemoryAccountStore;
use lite_token_account_storage::{
inmemory_token_account_storage::InmemoryTokenAccountStorage,
inmemory_token_storage::TokenProgramAccountsStorage,
};
use quic_geyser_common::{
filters::Filter, message::Message, types::connections_parameters::ConnectionParameters,
};
use snapshot_utils::{append_vec_iter, archived::ArchiveSnapshotExtractor, SnapshotExtractor};
use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey};
use crate::rpc_server::RpcServerImpl;
use crate::util::{all_accounts, import_snapshots, process_stream};
pub mod cli;
pub mod rpc_server;
pub mod snapshot_utils;
mod cli;
mod rpc_server;
mod util;
#[tokio::main(worker_threads = 2)]
async fn main() {
env_logger::init_from_env(
env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, "info"),
tracing_subscriber::fmt::init();
let grpc_addr = env::var("GRPC_ADDR").expect("need grpc url");
let grpc_x_token = env::var("GRPC_X_TOKEN").ok();
info!(
"Using grpc source on {} ({})",
grpc_addr,
grpc_x_token.is_some()
);
let args = Args::parse();
println!("tester args : {:?}", args);
let Args {
snapshot_archive_path,
quic_url,
} = args;
let token_account_storage = Arc::new(InmemoryTokenAccountStorage::default());
let token_storage: Arc<dyn AccountStorageInterface> =
Arc::new(TokenProgramAccountsStorage::new(token_account_storage));
// fill from quic geyser stream
if let Some(quic_url) = quic_url {
let token_storage = token_storage.clone();
tokio::spawn(async move {
let (quic_client, mut reciever, _jh) =
quic_geyser_client::non_blocking::client::Client::new(
quic_url,
ConnectionParameters::default(),
)
.await
.unwrap();
quic_client
.subscribe(vec![Filter::AccountsAll, Filter::Slot])
.await
.unwrap();
while let Some(message) = reciever.recv().await {
match message {
Message::AccountMsg(account) => {
let compression_method = match account.compression_type {
quic_geyser_common::compression::CompressionType::None => {
CompressionMethod::None
}
quic_geyser_common::compression::CompressionType::Lz4Fast(v) => {
CompressionMethod::Lz4(v)
}
quic_geyser_common::compression::CompressionType::Lz4(v) => {
CompressionMethod::Lz4(v)
}
};
let account_data = AccountData {
pubkey: account.pubkey,
account: Arc::new(Account {
lamports: account.lamports,
data: Data::new(&account.data, compression_method),
owner: account.owner,
executable: account.executable,
rent_epoch: account.rent_epoch,
}),
updated_slot: account.slot_identifier.slot,
write_version: account.write_version,
};
token_storage.update_account(
account_data,
lite_account_manager_common::commitment::Commitment::Processed,
);
}
Message::SlotMsg(slot_msg) => {
if slot_msg.commitment_config == CommitmentConfig::confirmed()
|| slot_msg.commitment_config == CommitmentConfig::finalized()
{
let commitment =
if slot_msg.commitment_config == CommitmentConfig::confirmed() {
Commitment::Confirmed
} else {
Commitment::Finalized
};
token_storage.process_slot_data(
SlotInfo {
slot: slot_msg.slot,
parent: slot_msg.parent,
root: 0,
},
commitment,
);
}
}
_ => {
//not supported
}
}
}
println!("stopping geyser stream");
log::error!("stopping geyser stream");
});
}
// load accounts from snapshot
let bk = {
let token_storage = token_storage.clone();
let token_program =
Pubkey::from_str("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA").unwrap();
tokio::task::spawn_blocking(move || {
let archive_path = PathBuf::from_str(snapshot_archive_path.as_str()).unwrap();
let mut loader: ArchiveSnapshotExtractor<File> =
ArchiveSnapshotExtractor::open(&archive_path).unwrap();
for vec in loader.iter() {
let append_vec = vec.unwrap();
// info!("size: {:?}", append_vec.len());
for handle in append_vec_iter(&append_vec) {
let stored = handle.access().unwrap();
if stored.account_meta.owner != token_program {
continue;
}
let data = stored.data;
let compressed_data = Data::new(data, CompressionMethod::None);
token_storage.initilize_or_update_account(AccountData {
pubkey: stored.meta.pubkey,
account: Arc::new(Account {
lamports: stored.account_meta.lamports,
data: compressed_data,
owner: stored.account_meta.owner,
executable: stored.account_meta.executable,
rent_epoch: stored.account_meta.rent_epoch,
}),
updated_slot: 0,
write_version: 0,
});
}
}
})
let timeouts = GrpcConnectionTimeouts {
connect_timeout: Duration::from_secs(25),
request_timeout: Duration::from_secs(25),
subscribe_timeout: Duration::from_secs(25),
receive_timeout: Duration::from_secs(25),
};
// await for loading of snapshot to finish
bk.await.unwrap();
log::info!("Storage Initialized with snapshot");
let rpc_server = RpcServerImpl::new(token_storage);
let config = GrpcSourceConfig::new(grpc_addr, grpc_x_token, None, timeouts.clone());
let (autoconnect_tx, geyser_rx) = tokio::sync::mpsc::channel(10);
let (_exit_tx, exit_rx) = tokio::sync::broadcast::channel::<()>(1);
let _all_accounts = create_geyser_autoconnection_task_with_mpsc(
config.clone(),
all_accounts(),
autoconnect_tx.clone(),
exit_rx.resubscribe(),
);
let db = Arc::new(AccountsDb::new());
let (mut slots_rx, mut accounts_rx) = process_stream(geyser_rx);
process_account_updates(db.clone(), accounts_rx);
info!("Waiting for most recent finalised block");
let slot = loop {
let slot = slots_rx.recv().await.unwrap();
debug!("slot {} - {}", slot.info.slot, slot.commitment);
if slot.commitment == Commitment::Finalized {
break slot;
}
};
process_slot_updates(db.clone(), slots_rx);
import_snapshots(slot.info.slot, db.clone());
let rpc_server = RpcServerImpl::new(db.clone());
info!("Storage Initialized with snapshot");
let jh = RpcServerImpl::start_serving(rpc_server, 10700)
.await
.unwrap();
let _ = jh.await;
}
fn process_slot_updates(db: Arc<AccountsDb>, mut slots_rx: Receiver<SlotInfoWithCommitment>) -> JoinHandle<()> {
tokio::spawn(async move {
loop {
let slot = slots_rx.recv().await.unwrap();
db.process_slot_data(slot.info, slot.commitment);
}
})
}
fn process_account_updates(db: Arc<AccountsDb>, mut accounts_rx: Receiver<AccountData>) -> JoinHandle<()> {
tokio::spawn(async move {
loop {
let account = accounts_rx.recv().await.unwrap();
db.initilize_or_update_account(account);
}
})
}

View File

@ -1,229 +0,0 @@
// Copyright 2022 Solana Foundation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// This file contains code vendored from https://github.com/solana-labs/solana
// Source: solana/runtime/src/append_vec.rs
use {
log::*,
memmap2::{Mmap, MmapMut},
solana_accounts_db::{
account_storage::meta::{AccountMeta, StoredMeta},
accounts_file::ALIGN_BOUNDARY_OFFSET,
append_vec::MAXIMUM_APPEND_VEC_FILE_SIZE,
u64_align,
},
solana_sdk::{
account::{Account, AccountSharedData},
hash::Hash,
},
std::{
convert::TryFrom,
fs::OpenOptions,
io::{self, Read},
mem,
path::Path,
},
};
/// References to account data stored elsewhere. Getting an `Account` requires cloning
/// (see `StoredAccountMeta::clone_account()`).
#[derive(PartialEq, Eq, Debug)]
pub struct StoredAccountMeta<'a> {
pub meta: &'a StoredMeta,
/// account data
pub account_meta: &'a AccountMeta,
pub data: &'a [u8],
pub offset: usize,
pub stored_size: usize,
pub hash: &'a Hash,
}
impl<'a> StoredAccountMeta<'a> {
/// Return a new Account by copying all the data referenced by the `StoredAccountMeta`.
pub fn clone_account(&self) -> AccountSharedData {
AccountSharedData::from(Account {
lamports: self.account_meta.lamports,
owner: self.account_meta.owner,
executable: self.account_meta.executable,
rent_epoch: self.account_meta.rent_epoch,
data: self.data.to_vec(),
})
}
}
/// A thread-safe, file-backed block of memory used to store `Account` instances. Append operations
/// are serialized such that only one thread updates the internal `append_lock` at a time. No
/// restrictions are placed on reading. That is, one may read items from one thread while another
/// is appending new items.
pub struct AppendVec {
/// A file-backed block of memory that is used to store the data for each appended item.
map: Mmap,
/// The number of bytes used to store items, not the number of items.
current_len: usize,
/// The number of bytes available for storing items.
file_size: u64,
slot: u64,
}
impl AppendVec {
fn sanitize_len_and_size(current_len: usize, file_size: usize) -> io::Result<()> {
if file_size == 0 {
Err(std::io::Error::new(
std::io::ErrorKind::Other,
format!("too small file size {} for AppendVec", file_size),
))
} else if usize::try_from(MAXIMUM_APPEND_VEC_FILE_SIZE)
.map(|max| file_size > max)
.unwrap_or(true)
{
Err(std::io::Error::new(
std::io::ErrorKind::Other,
format!("too large file size {} for AppendVec", file_size),
))
} else if current_len > file_size {
Err(std::io::Error::new(
std::io::ErrorKind::Other,
format!("current_len is larger than file size ({})", file_size),
))
} else {
Ok(())
}
}
/// how many more bytes can be stored in this append vec
pub const fn remaining_bytes(&self) -> u64 {
(self.capacity()).saturating_sub(self.len() as u64)
}
pub const fn len(&self) -> usize {
self.current_len
}
pub const fn is_empty(&self) -> bool {
self.len() == 0
}
pub const fn capacity(&self) -> u64 {
self.file_size
}
pub fn new_from_file<P: AsRef<Path>>(
path: P,
current_len: usize,
slot: u64,
) -> io::Result<Self> {
let data = OpenOptions::new()
.read(true)
.write(false)
.create(false)
.open(&path)?;
let file_size = std::fs::metadata(&path)?.len();
AppendVec::sanitize_len_and_size(current_len, file_size as usize)?;
let map = unsafe {
let result = Mmap::map(&data);
if result.is_err() {
// for vm.max_map_count, error is: {code: 12, kind: Other, message: "Cannot allocate memory"}
info!("memory map error: {:?}. This may be because vm.max_map_count is not set correctly.", result);
}
result?
};
let new = AppendVec {
map,
current_len,
file_size,
slot,
};
Ok(new)
}
pub fn new_from_reader<R: Read>(
reader: &mut R,
current_len: usize,
slot: u64,
) -> io::Result<Self> {
let mut map = MmapMut::map_anon(current_len)?;
io::copy(&mut reader.take(current_len as u64), &mut map.as_mut())?;
Ok(AppendVec {
map: map.make_read_only()?,
current_len,
file_size: current_len as u64,
slot,
})
}
/// Get a reference to the data at `offset` of `size` bytes if that slice
/// doesn't overrun the internal buffer. Otherwise return None.
/// Also return the offset of the first byte after the requested data that
/// falls on a 64-byte boundary.
fn get_slice(&self, offset: usize, size: usize) -> Option<(&[u8], usize)> {
let (next, overflow) = offset.overflowing_add(size);
if overflow || next > self.len() {
return None;
}
let data = &self.map[offset..next];
let next = u64_align!(next);
Some((
//UNSAFE: This unsafe creates a slice that represents a chunk of self.map memory
//The lifetime of this slice is tied to &self, since it points to self.map memory
unsafe { std::slice::from_raw_parts(data.as_ptr(), size) },
next,
))
}
/// Return a reference to the type at `offset` if its data doesn't overrun the internal buffer.
/// Otherwise return None. Also return the offset of the first byte after the requested data
/// that falls on a 64-byte boundary.
fn get_type<'a, T>(&self, offset: usize) -> Option<(&'a T, usize)> {
let (data, next) = self.get_slice(offset, mem::size_of::<T>())?;
let ptr: *const T = data.as_ptr() as *const T;
//UNSAFE: The cast is safe because the slice is aligned and fits into the memory
//and the lifetime of the &T is tied to self, which holds the underlying memory map
Some((unsafe { &*ptr }, next))
}
/// Return account metadata for the account at `offset` if its data doesn't overrun
/// the internal buffer. Otherwise return None. Also return the offset of the first byte
/// after the requested data that falls on a 64-byte boundary.
pub fn get_account<'a>(&'a self, offset: usize) -> Option<(StoredAccountMeta<'a>, usize)> {
let (meta, next): (&'a StoredMeta, _) = self.get_type(offset)?;
let (account_meta, next): (&'a AccountMeta, _) = self.get_type(next)?;
let (hash, next): (&'a Hash, _) = self.get_type(next)?;
let (data, next) = self.get_slice(next, meta.data_len as usize)?;
let stored_size = next - offset;
Some((
StoredAccountMeta {
meta,
account_meta,
data,
offset,
stored_size,
hash,
},
next,
))
}
pub const fn slot(&self) -> u64 {
self.slot
}
}

View File

@ -1,179 +0,0 @@
use {
crate::snapshot_utils::{
deserialize_from, parse_append_vec_name, AccountsDbFields, AppendVec, AppendVecIterator,
DeserializableVersionedBank, SerializableAccountStorageEntry, SnapshotError,
SnapshotExtractor, SnapshotResult,
},
log::info,
std::{
fs::File,
io::{BufReader, Read},
path::{Component, Path},
pin::Pin,
time::Instant,
},
tar::{Archive, Entries, Entry},
};
/// Extracts account data from a .tar.zst stream.
pub struct ArchiveSnapshotExtractor<Source>
where
Source: Read + Unpin + 'static,
{
accounts_db_fields: AccountsDbFields<SerializableAccountStorageEntry>,
_archive: Pin<Box<Archive<zstd::Decoder<'static, BufReader<Source>>>>>,
entries: Option<Entries<'static, zstd::Decoder<'static, BufReader<Source>>>>,
}
impl<Source> SnapshotExtractor for ArchiveSnapshotExtractor<Source>
where
Source: Read + Unpin + 'static,
{
fn iter(&mut self) -> AppendVecIterator<'_> {
Box::new(self.unboxed_iter())
}
}
impl<Source> ArchiveSnapshotExtractor<Source>
where
Source: Read + Unpin + 'static,
{
pub fn from_reader(source: Source) -> SnapshotResult<Self> {
let tar_stream = zstd::stream::read::Decoder::new(source)?;
let mut archive = Box::pin(Archive::new(tar_stream));
// This is safe as long as we guarantee that entries never gets accessed past drop.
let archive_static = unsafe { &mut *((&mut *archive) as *mut Archive<_>) };
let mut entries = archive_static.entries()?;
// Search for snapshot manifest.
let mut snapshot_file: Option<Entry<_>> = None;
for entry in entries.by_ref() {
let entry = entry?;
let path = entry.path()?;
if Self::is_snapshot_manifest_file(&path) {
snapshot_file = Some(entry);
break;
} else if Self::is_appendvec_file(&path) {
// TODO Support archives where AppendVecs precede snapshot manifests
return Err(SnapshotError::UnexpectedAppendVec);
}
}
let snapshot_file = snapshot_file.ok_or(SnapshotError::NoSnapshotManifest)?;
//let snapshot_file_len = snapshot_file.size();
let snapshot_file_path = snapshot_file.path()?.as_ref().to_path_buf();
info!("Opening snapshot manifest: {:?}", &snapshot_file_path);
let mut snapshot_file = BufReader::new(snapshot_file);
let pre_unpack = Instant::now();
let versioned_bank: DeserializableVersionedBank = deserialize_from(&mut snapshot_file)?;
drop(versioned_bank);
let versioned_bank_post_time = Instant::now();
let accounts_db_fields: AccountsDbFields<SerializableAccountStorageEntry> =
deserialize_from(&mut snapshot_file)?;
let accounts_db_fields_post_time = Instant::now();
drop(snapshot_file);
info!(
"Read bank fields in {:?}",
versioned_bank_post_time - pre_unpack
);
info!(
"Read accounts DB fields in {:?}",
accounts_db_fields_post_time - versioned_bank_post_time
);
Ok(ArchiveSnapshotExtractor {
_archive: archive,
accounts_db_fields,
entries: Some(entries),
})
}
fn unboxed_iter(&mut self) -> impl Iterator<Item = SnapshotResult<AppendVec>> + '_ {
self.entries
.take()
.into_iter()
.flatten()
.filter_map(|entry| {
let mut entry = match entry {
Ok(x) => x,
Err(e) => return Some(Err(e.into())),
};
let path = match entry.path() {
Ok(x) => x,
Err(e) => return Some(Err(e.into())),
};
let (slot, id) = path.file_name().and_then(parse_append_vec_name)?;
Some(self.process_entry(&mut entry, slot, id))
})
}
fn process_entry(
&self,
entry: &mut Entry<'static, zstd::Decoder<'static, BufReader<Source>>>,
slot: u64,
id: u64,
) -> SnapshotResult<AppendVec> {
let known_vecs = self
.accounts_db_fields
.0
.get(&slot)
.map(|v| &v[..])
.unwrap_or(&[]);
let known_vec = known_vecs.iter().find(|entry| entry.id == (id as usize));
let known_vec = match known_vec {
None => return Err(SnapshotError::UnexpectedAppendVec),
Some(v) => v,
};
Ok(AppendVec::new_from_reader(
entry,
known_vec.accounts_current_len,
slot,
)?)
}
fn is_snapshot_manifest_file(path: &Path) -> bool {
let mut components = path.components();
if components.next() != Some(Component::Normal("snapshots".as_ref())) {
return false;
}
let slot_number_str_1 = match components.next() {
Some(Component::Normal(slot)) => slot,
_ => return false,
};
// Check if slot number file is valid u64.
if slot_number_str_1
.to_str()
.and_then(|s| s.parse::<u64>().ok())
.is_none()
{
return false;
}
let slot_number_str_2 = match components.next() {
Some(Component::Normal(slot)) => slot,
_ => return false,
};
components.next().is_none() && slot_number_str_1 == slot_number_str_2
}
fn is_appendvec_file(path: &Path) -> bool {
let mut components = path.components();
if components.next() != Some(Component::Normal("accounts".as_ref())) {
return false;
}
let name = match components.next() {
Some(Component::Normal(c)) => c,
_ => return false,
};
components.next().is_none() && parse_append_vec_name(name).is_some()
}
}
impl ArchiveSnapshotExtractor<File> {
pub fn open(path: &Path) -> SnapshotResult<Self> {
Self::from_reader(File::open(path)?)
}
}

View File

@ -1,104 +0,0 @@
use {
crate::{
snapshot_utils::append_vec::{AppendVec, StoredAccountMeta},
snapshot_utils::solana::{
deserialize_from, AccountsDbFields, DeserializableVersionedBank,
SerializableAccountStorageEntry,
},
},
std::{ffi::OsStr, io::Read, path::Path, str::FromStr},
thiserror::Error,
};
pub mod append_vec;
pub mod archived;
pub mod parallel;
pub mod solana;
pub mod unpacked;
const SNAPSHOTS_DIR: &str = "snapshots";
#[derive(Error, Debug)]
pub enum SnapshotError {
#[error("{0}")]
IOError(#[from] std::io::Error),
#[error("Failed to deserialize: {0}")]
BincodeError(#[from] bincode::Error),
#[error("Missing status cache")]
NoStatusCache,
#[error("No snapshot manifest file found")]
NoSnapshotManifest,
#[error("Unexpected AppendVec")]
UnexpectedAppendVec,
#[error("Failed to create read progress tracking: {0}")]
ReadProgressTracking(String),
}
pub type SnapshotResult<T> = Result<T, SnapshotError>;
pub type AppendVecIterator<'a> = Box<dyn Iterator<Item = SnapshotResult<AppendVec>> + 'a>;
pub trait SnapshotExtractor: Sized {
fn iter(&mut self) -> AppendVecIterator<'_>;
}
fn parse_append_vec_name(name: &OsStr) -> Option<(u64, u64)> {
let name = name.to_str()?;
let mut parts = name.splitn(2, '.');
let slot = u64::from_str(parts.next().unwrap_or(""));
let id = u64::from_str(parts.next().unwrap_or(""));
match (slot, id) {
(Ok(slot), Ok(version)) => Some((slot, version)),
_ => None,
}
}
pub fn append_vec_iter(append_vec: &AppendVec) -> impl Iterator<Item = StoredAccountMetaHandle> {
let mut offset = 0usize;
std::iter::repeat_with(move || {
append_vec.get_account(offset).map(|(_, next_offset)| {
let account = StoredAccountMetaHandle::new(append_vec, offset);
offset = next_offset;
account
})
})
.take_while(|account| account.is_some())
.flatten()
}
pub struct StoredAccountMetaHandle<'a> {
append_vec: &'a AppendVec,
offset: usize,
}
impl<'a> StoredAccountMetaHandle<'a> {
pub const fn new(append_vec: &'a AppendVec, offset: usize) -> StoredAccountMetaHandle {
Self { append_vec, offset }
}
pub fn access(&self) -> Option<StoredAccountMeta<'_>> {
Some(self.append_vec.get_account(self.offset)?.0)
}
}
pub trait ReadProgressTracking {
fn new_read_progress_tracker(
&self,
path: &Path,
rd: Box<dyn Read>,
file_len: u64,
) -> SnapshotResult<Box<dyn Read>>;
}
struct NoopReadProgressTracking {}
impl ReadProgressTracking for NoopReadProgressTracking {
fn new_read_progress_tracker(
&self,
_path: &Path,
rd: Box<dyn Read>,
_file_len: u64,
) -> SnapshotResult<Box<dyn Read>> {
Ok(rd)
}
}

View File

@ -1,38 +0,0 @@
use {
crate::snapshot_utils::{AppendVec, AppendVecIterator},
tokio::task::JoinSet,
};
#[async_trait::async_trait]
pub trait AppendVecConsumer {
async fn on_append_vec(&mut self, append_vec: AppendVec) -> anyhow::Result<()>;
}
pub async fn par_iter_append_vecs<F, A>(
iterator: AppendVecIterator<'_>,
create_consumer: F,
num_threads: usize,
) -> anyhow::Result<()>
where
F: Fn() -> A,
A: AppendVecConsumer + Send + 'static,
{
let mut tasks = JoinSet::new();
for append_vec in iterator {
let mut consumer = if tasks.len() >= num_threads {
tasks.join_next().await.expect("checked")??
} else {
create_consumer()
};
tasks.spawn(async move {
consumer.on_append_vec(append_vec?).await?;
Ok::<_, anyhow::Error>(consumer)
});
}
while let Some(result) = tasks.join_next().await {
result??;
}
Ok(())
}

View File

@ -1,128 +0,0 @@
// Copyright 2022 Solana Foundation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// This file contains code vendored from https://github.com/solana-labs/solana
use std::collections::{HashMap, HashSet};
use std::io::Read;
use bincode::Options;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use solana_accounts_db::account_storage::meta::StoredMetaWriteVersion;
use solana_accounts_db::accounts_db::BankHashStats;
use solana_accounts_db::ancestors::AncestorsForSerialization;
use solana_accounts_db::blockhash_queue::BlockhashQueue;
use solana_frozen_abi_macro::AbiExample;
use solana_runtime::epoch_stakes::EpochStakes;
use solana_runtime::stakes::Stakes;
use solana_sdk::clock::{Epoch, UnixTimestamp};
use solana_sdk::deserialize_utils::default_on_eof;
use solana_sdk::epoch_schedule::EpochSchedule;
use solana_sdk::fee_calculator::{FeeCalculator, FeeRateGovernor};
use solana_sdk::hard_forks::HardForks;
use solana_sdk::hash::Hash;
use solana_sdk::inflation::Inflation;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::rent_collector::RentCollector;
use solana_sdk::slot_history::Slot;
use solana_sdk::stake::state::Delegation;
const MAX_STREAM_SIZE: u64 = 32 * 1024 * 1024 * 1024;
pub fn deserialize_from<R, T>(reader: R) -> bincode::Result<T>
where
R: Read,
T: DeserializeOwned,
{
bincode::options()
.with_limit(MAX_STREAM_SIZE)
.with_fixint_encoding()
.allow_trailing_bytes()
.deserialize_from::<R, T>(reader)
}
#[derive(Default, PartialEq, Eq, Debug, Deserialize)]
struct UnusedAccounts {
unused1: HashSet<Pubkey>,
unused2: HashSet<Pubkey>,
unused3: HashMap<Pubkey, u64>,
}
#[derive(Deserialize)]
#[allow(dead_code)]
pub struct DeserializableVersionedBank {
pub blockhash_queue: BlockhashQueue,
pub ancestors: AncestorsForSerialization,
pub hash: Hash,
pub parent_hash: Hash,
pub parent_slot: Slot,
pub hard_forks: HardForks,
pub transaction_count: u64,
pub tick_height: u64,
pub signature_count: u64,
pub capitalization: u64,
pub max_tick_height: u64,
pub hashes_per_tick: Option<u64>,
pub ticks_per_slot: u64,
pub ns_per_slot: u128,
pub genesis_creation_time: UnixTimestamp,
pub slots_per_year: f64,
pub accounts_data_len: u64,
pub slot: Slot,
pub epoch: Epoch,
pub block_height: u64,
pub collector_id: Pubkey,
pub collector_fees: u64,
pub fee_calculator: FeeCalculator,
pub fee_rate_governor: FeeRateGovernor,
pub collected_rent: u64,
pub rent_collector: RentCollector,
pub epoch_schedule: EpochSchedule,
pub inflation: Inflation,
pub stakes: Stakes<Delegation>,
#[allow(dead_code)]
unused_accounts: UnusedAccounts,
pub epoch_stakes: HashMap<Epoch, EpochStakes>,
pub is_delta: bool,
}
#[derive(Clone, Default, Debug, Serialize, Deserialize, PartialEq, Eq, AbiExample)]
pub struct BankHashInfo {
pub hash: Hash,
pub snapshot_hash: Hash,
pub stats: BankHashStats,
}
#[derive(Clone, Debug, Default, Deserialize, PartialEq)]
pub struct AccountsDbFields<T>(
pub HashMap<Slot, Vec<T>>,
pub StoredMetaWriteVersion,
pub Slot,
pub BankHashInfo,
/// all slots that were roots within the last epoch
#[serde(deserialize_with = "default_on_eof")]
pub Vec<Slot>,
/// slots that were roots within the last epoch for which we care about the hash value
#[serde(deserialize_with = "default_on_eof")]
pub Vec<(Slot, Hash)>,
);
pub type SerializedAppendVecId = usize;
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Deserialize)]
pub struct SerializableAccountStorageEntry {
pub id: SerializedAppendVecId,
pub accounts_current_len: usize,
}

View File

@ -1,125 +0,0 @@
use {
crate::snapshot_utils::{
deserialize_from, parse_append_vec_name, AccountsDbFields, AppendVec, AppendVecIterator,
DeserializableVersionedBank, ReadProgressTracking, SerializableAccountStorageEntry,
SnapshotError, SnapshotExtractor, SnapshotResult, SNAPSHOTS_DIR,
},
itertools::Itertools,
log::info,
solana_runtime::snapshot_utils::SNAPSHOT_STATUS_CACHE_FILENAME,
std::{
fs::OpenOptions,
io::BufReader,
path::{Path, PathBuf},
str::FromStr,
time::Instant,
},
};
/// Extracts account data from snapshots that were unarchived to a file system.
pub struct UnpackedSnapshotExtractor {
root: PathBuf,
accounts_db_fields: AccountsDbFields<SerializableAccountStorageEntry>,
}
impl SnapshotExtractor for UnpackedSnapshotExtractor {
fn iter(&mut self) -> AppendVecIterator<'_> {
Box::new(self.unboxed_iter())
}
}
impl UnpackedSnapshotExtractor {
pub fn open(
path: &Path,
progress_tracking: Box<dyn ReadProgressTracking>,
) -> SnapshotResult<Self> {
let snapshots_dir = path.join(SNAPSHOTS_DIR);
let status_cache = snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
if !status_cache.is_file() {
return Err(SnapshotError::NoStatusCache);
}
let snapshot_files = snapshots_dir.read_dir()?;
let snapshot_file_path = snapshot_files
.filter_map(|entry| entry.ok())
.find(|entry| u64::from_str(&entry.file_name().to_string_lossy()).is_ok())
.map(|entry| entry.path().join(entry.file_name()))
.ok_or(SnapshotError::NoSnapshotManifest)?;
info!("Opening snapshot manifest: {:?}", snapshot_file_path);
let snapshot_file = OpenOptions::new().read(true).open(&snapshot_file_path)?;
let snapshot_file_len = snapshot_file.metadata()?.len();
let snapshot_file = progress_tracking.new_read_progress_tracker(
&snapshot_file_path,
Box::new(snapshot_file),
snapshot_file_len,
)?;
let mut snapshot_file = BufReader::new(snapshot_file);
let pre_unpack = Instant::now();
let versioned_bank: DeserializableVersionedBank = deserialize_from(&mut snapshot_file)?;
drop(versioned_bank);
let versioned_bank_post_time = Instant::now();
let accounts_db_fields: AccountsDbFields<SerializableAccountStorageEntry> =
deserialize_from(&mut snapshot_file)?;
let accounts_db_fields_post_time = Instant::now();
drop(snapshot_file);
info!(
"Read bank fields in {:?}",
versioned_bank_post_time - pre_unpack
);
info!(
"Read accounts DB fields in {:?}",
accounts_db_fields_post_time - versioned_bank_post_time
);
Ok(UnpackedSnapshotExtractor {
root: path.to_path_buf(),
accounts_db_fields,
})
}
pub fn unboxed_iter(&self) -> impl Iterator<Item = SnapshotResult<AppendVec>> + '_ {
std::iter::once(self.iter_streams())
.flatten_ok()
.flatten_ok()
}
fn iter_streams(&self) -> SnapshotResult<impl Iterator<Item = SnapshotResult<AppendVec>> + '_> {
let accounts_dir = self.root.join("accounts");
Ok(accounts_dir
.read_dir()?
.filter_map(|f| f.ok())
.filter_map(|f| {
let name = f.file_name();
parse_append_vec_name(&f.file_name()).map(move |parsed| (parsed, name))
})
.map(move |((slot, version), name)| {
self.open_append_vec(slot, version, &accounts_dir.join(name))
}))
}
fn open_append_vec(&self, slot: u64, id: u64, path: &Path) -> SnapshotResult<AppendVec> {
let known_vecs = self
.accounts_db_fields
.0
.get(&slot)
.map(|v| &v[..])
.unwrap_or(&[]);
let known_vec = known_vecs.iter().find(|entry| entry.id == (id as usize));
let known_vec = match known_vec {
None => return Err(SnapshotError::UnexpectedAppendVec),
Some(v) => v,
};
Ok(AppendVec::new_from_file(
path,
known_vec.accounts_current_len,
slot,
)?)
}
}

View File

@ -0,0 +1,123 @@
use std::collections::HashMap;
use std::num::NonZeroUsize;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
use geyser_grpc_connector::Message;
use solana_sdk::clock::Slot;
use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey;
use tokio::sync::mpsc::Receiver;
use tokio::task::JoinHandle;
use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterSlots, SubscribeUpdateSlot};
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use lite_account_manager_common::account_data::{Account, AccountData, Data};
use lite_account_manager_common::commitment::Commitment;
use lite_account_manager_common::slot_info::{SlotInfo, SlotInfoWithCommitment};
use lite_account_storage::accountsdb::AccountsDb;
use lite_accounts_from_snapshot::{Config, HostUrl, import};
pub(crate) fn import_snapshots(slot: Slot, db: Arc<AccountsDb>) -> JoinHandle<()> {
let config = Config {
hosts: vec![
HostUrl::from_str("http://147.28.178.75:8899").unwrap(),
HostUrl::from_str("http://204.13.239.110:8899").unwrap(),
HostUrl::from_str("http://149.50.110.119:8899").unwrap(),
HostUrl::from_str("http://146.59.54.19:8899").unwrap(),
HostUrl::from_str("http://74.50.77.158:80").unwrap(),
HostUrl::from_str("http://149.50.104.41:8899").unwrap(),
HostUrl::from_str("http://205.209.109.158:8899").unwrap(),
]
.into_boxed_slice(),
not_before_slot: slot,
full_snapshot_path: PathBuf::from_str("/tmp/full-snapshot").unwrap(),
incremental_snapshot_path: PathBuf::from_str("/tmp/incremental-snapshot").unwrap(),
maximum_full_snapshot_archives_to_retain: NonZeroUsize::new(10).unwrap(),
maximum_incremental_snapshot_archives_to_retain: NonZeroUsize::new(10).unwrap(),
};
import(config, db)
}
pub(crate) fn process_stream(mut geyser_messages_rx: Receiver<Message>) -> (Receiver<SlotInfoWithCommitment>, Receiver<AccountData>) {
let (accounts_tx, accounts_rx) = tokio::sync::mpsc::channel::<AccountData>(1000);
let (slots_tx, slots_rx) = tokio::sync::mpsc::channel::<SlotInfoWithCommitment>(10);
tokio::spawn(async move {
loop {
match geyser_messages_rx.recv().await {
Some(Message::GeyserSubscribeUpdate(update)) => match update.update_oneof {
Some(UpdateOneof::Account(update)) => {
let info = update.account.unwrap();
let slot = update.slot;
let account_pk = Pubkey::try_from(info.pubkey).unwrap();
let account_owner_pk = Pubkey::try_from(info.owner).unwrap();
accounts_tx
.send(AccountData {
pubkey: account_pk,
account: Arc::new(Account {
lamports: info.lamports,
data: Data::Uncompressed(info.data),
owner: account_owner_pk,
executable: info.executable,
rent_epoch: info.rent_epoch,
}),
updated_slot: slot,
write_version: info.write_version,
})
.await
.expect("Failed to send account");
}
Some(UpdateOneof::Slot(slot)) => {
slots_tx.send(SlotInfoWithCommitment {
info: SlotInfo {
slot: slot.slot,
parent: slot.parent.unwrap_or(0),
root: 0,
},
commitment: Commitment::from(slot.status),
}).await
.expect("Failed to send slot info")
}
None => {}
_ => {}
},
None => {
log::warn!("multiplexer channel closed - aborting");
return;
}
Some(Message::Connecting(_)) => {}
}
}
});
return (slots_rx, accounts_rx);
}
pub fn all_accounts() -> SubscribeRequest {
let mut accounts_subs = HashMap::new();
accounts_subs.insert(
"client".to_string(),
SubscribeRequestFilterAccounts {
account: vec![],
owner: vec![],
filters: vec![],
},
);
let mut slots_subs = HashMap::new();
slots_subs.insert(
"client".to_string(),
SubscribeRequestFilterSlots { filter_by_commitment: None },
);
SubscribeRequest {
accounts: accounts_subs,
slots: slots_subs,
..Default::default()
}
}