diff --git a/account_storage/src/account_data_by_commitment.rs b/account_storage/src/account_data_by_commitment.rs index d5975f6..eb8a654 100644 --- a/account_storage/src/account_data_by_commitment.rs +++ b/account_storage/src/account_data_by_commitment.rs @@ -53,7 +53,7 @@ impl AccountDataByCommitment { pub fn get_account_data_filtered( &self, commitment: Commitment, - mut filters: Vec, + filters: &Vec, ) -> Option { let account_data = match commitment { Commitment::Processed => self @@ -69,28 +69,15 @@ impl AccountDataByCommitment { return None; }; - let tmp_filters = filters.clone(); - let size = tmp_filters.iter().enumerate().find(|x| match x.1 { - AccountFilterType::Datasize(_) => true, - AccountFilterType::Memcmp(_) => false, - }); - - // filter by size - match size { - Some((index, AccountFilterType::Datasize(size))) => { - if *size != account_data.account.data.len() as u64 { + // check size filter first before decompressing + for filter in filters { + if let AccountFilterType::Datasize(size) = filter { + if account_data.account.data.len() as u64 != *size { return None; - } else { - filters.remove(index); } } - None => { - // check other filters - } - Some(_) => { - return None; - } } + // match other filters if !filters.is_empty() { let data = account_data.account.data.data(); diff --git a/account_storage/src/inmemory_account_store.rs b/account_storage/src/inmemory_account_store.rs index 0b34627..a3fa323 100644 --- a/account_storage/src/inmemory_account_store.rs +++ b/account_storage/src/inmemory_account_store.rs @@ -16,6 +16,7 @@ use lite_account_manager_common::{ use prometheus::{opts, register_int_gauge, IntGauge}; use solana_sdk::{pubkey::Pubkey, slot_history::Slot}; use std::collections::BTreeMap; +use std::sync::RwLock; lazy_static::lazy_static! { static ref ACCOUNT_STORED_IN_MEMORY: IntGauge = @@ -36,7 +37,7 @@ struct SlotStatus { pub struct InmemoryAccountStore { account_store: Arc>, - accounts_by_owner: Arc>>, + accounts_by_owner: Arc>>>>, slots_status: Arc>>, filtered_accounts: Arc, } @@ -54,12 +55,12 @@ impl InmemoryAccountStore { fn add_account_owner(&self, account: Pubkey, owner: Pubkey) { match self.accounts_by_owner.entry(owner) { dashmap::mapref::entry::Entry::Occupied(mut occ) => { - occ.get_mut().insert(account); + occ.get_mut().write().unwrap().insert(account); } dashmap::mapref::entry::Entry::Vacant(vc) => { let mut set = HashSet::new(); set.insert(account); - vc.insert(set); + vc.insert(Arc::new(RwLock::new(set))); } } } @@ -81,7 +82,10 @@ impl InmemoryAccountStore { .entry(prev_account_data.account.owner) { dashmap::mapref::entry::Entry::Occupied(mut occ) => { - occ.get_mut().remove(&prev_account_data.pubkey); + occ.get_mut() + .write() + .unwrap() + .remove(&prev_account_data.pubkey); } dashmap::mapref::entry::Entry::Vacant(_) => { // do nothing @@ -237,48 +241,48 @@ impl AccountStorageInterface for InmemoryAccountStore { }) { return Err(AccountLoadingError::ConfigDoesnotContainRequiredFilters); } - match self.accounts_by_owner.entry(program_pubkey) { - dashmap::mapref::entry::Entry::Occupied(occ) => { - let mut return_vec = vec![]; - let program_pubkeys = occ.get(); - for program_account in program_pubkeys { - match &account_filters { - Some(account_filters) => { - match self.account_store.entry(*program_account) { - dashmap::mapref::entry::Entry::Occupied(occ) => { - let account = occ.get().get_account_data_filtered( - commitment, - account_filters.clone(), - ); - drop(occ); - if let Some(account_data) = account { - if account_data.account.lamports > 0 - && account_data.account.owner == program_pubkey - { - return_vec.push(account_data); - } - } - } - dashmap::mapref::entry::Entry::Vacant(_) => { - // do nothing - } - }; - } - None => { - let account_data = self.get_account(*program_account, commitment)?; - if let Some(account_data) = account_data { - // recheck owner - if program_pubkey == account_data.account.owner { + let lk = match self.accounts_by_owner.entry(program_pubkey) { + dashmap::mapref::entry::Entry::Occupied(occ) => occ.get().clone(), + dashmap::mapref::entry::Entry::Vacant(_) => { + return Ok(vec![]); + } + }; + + let mut return_vec = vec![]; + for program_account in lk.read().unwrap().iter() { + match &account_filters { + Some(account_filters) => { + match self.account_store.entry(*program_account) { + dashmap::mapref::entry::Entry::Occupied(occ) => { + let account = occ + .get() + .get_account_data_filtered(commitment, account_filters); + drop(occ); + if let Some(account_data) = account { + if account_data.account.lamports > 0 + && account_data.account.owner == program_pubkey + { return_vec.push(account_data); } } } + dashmap::mapref::entry::Entry::Vacant(_) => { + // do nothing + } + }; + } + None => { + let account_data = self.get_account(*program_account, commitment)?; + if let Some(account_data) = account_data { + // recheck owner + if program_pubkey == account_data.account.owner { + return_vec.push(account_data); + } } } - Ok(return_vec) } - dashmap::mapref::entry::Entry::Vacant(_) => Ok(vec![]), } + Ok(return_vec) } fn process_slot_data(&self, slot_info: SlotInfo, commitment: Commitment) -> Vec { diff --git a/common/src/account_data.rs b/common/src/account_data.rs index 1e5763d..86f48dd 100644 --- a/common/src/account_data.rs +++ b/common/src/account_data.rs @@ -24,13 +24,13 @@ pub enum Data { } impl Data { - pub fn new(data:&[u8], compression_method: CompressionMethod) -> Self { + pub fn new(data: &[u8], compression_method: CompressionMethod) -> Self { match compression_method { CompressionMethod::None => Data::Uncompressed(data.to_vec()), CompressionMethod::Lz4(level) => { let len = data.len(); let binary = lz4::block::compress( - &data, + data, Some(lz4::block::CompressionMode::FAST(level)), true, ) @@ -39,7 +39,7 @@ impl Data { } CompressionMethod::Zstd(level) => { let len = data.len(); - let binary = zstd::bulk::compress(&data, level).unwrap(); + let binary = zstd::bulk::compress(data, level).unwrap(); Data::Zstd { binary, len } } } diff --git a/simulate_from_snapshot/Cargo.toml b/simulate_from_snapshot/Cargo.toml new file mode 100644 index 0000000..5e6a47a --- /dev/null +++ b/simulate_from_snapshot/Cargo.toml @@ -0,0 +1,44 @@ +[package] +name = "lite-accounts-simulate_from_snapshot" +version = "0.1.0" +edition = "2021" + +[dependencies] +lite-account-manager-common = { workspace = true } +lite-account-storage = { workspace = true } +lite-token-account-storage = { workspace = true } + +solana-sdk = {workspace = true} +solana-client = {workspace = true} +solana-accounts-db = {workspace = true} +solana-frozen-abi-macro = {workspace = true} +solana-runtime = {workspace = true} +solana-rpc-client-api = { workspace = true } +solana-account-decoder = { workspace = true } + +itertools = { workspace = true } +futures = { workspace = true } +lazy_static = {workspace = true} +prometheus = {workspace = true} +log = {workspace = true} +bincode = {workspace = true} +serde = { workspace = true } +anyhow = { workspace = true } + +clap = { version = "4.5.4", features = ["derive"] } +memmap2 = "0.5.5" +thiserror = "1.0.31" +tar = "0.4.38" +tokio = { version = "1.38.0", features = ["rt-multi-thread", "macros"]} +zstd = "0.13.2" +async-trait = "0.1.73" +env_logger = "0.9.0" +base64 = "0.21.0" + +jsonrpsee = { version = "0.20.0", features = ["macros", "full"] } +tower = "0.4.13" +tower-http = { version = "0.4.0", features = ["full"] } + + +quic-geyser-client = { git = "https://github.com/blockworks-foundation/quic_geyser_plugin.git", branch = "main" } +quic-geyser-common = { git = "https://github.com/blockworks-foundation/quic_geyser_plugin.git", branch = "main" } \ No newline at end of file diff --git a/simulate_from_snapshot/src/cli.rs b/simulate_from_snapshot/src/cli.rs new file mode 100644 index 0000000..bf48dca --- /dev/null +++ b/simulate_from_snapshot/src/cli.rs @@ -0,0 +1,11 @@ +use clap::Parser; + +#[derive(Parser, Debug)] +#[command(author, version, about, long_about = None)] +pub struct Args { + #[arg(short = 's', long)] + pub snapshot_archive_path: String, + + #[arg(short = 'u', long)] + pub quic_url: Option, +} diff --git a/simulate_from_snapshot/src/main.rs b/simulate_from_snapshot/src/main.rs new file mode 100644 index 0000000..7cade5f --- /dev/null +++ b/simulate_from_snapshot/src/main.rs @@ -0,0 +1,168 @@ +use std::{fs::File, path::PathBuf, str::FromStr, sync::Arc}; + +use clap::Parser; +use cli::Args; +use lite_account_manager_common::{ + account_data::{Account, AccountData, CompressionMethod, Data}, + account_store_interface::AccountStorageInterface, + commitment::Commitment, + slot_info::SlotInfo, +}; +use lite_token_account_storage::{ + inmemory_token_account_storage::InmemoryTokenAccountStorage, + inmemory_token_storage::TokenProgramAccountsStorage, +}; +use quic_geyser_common::{ + filters::Filter, message::Message, types::connections_parameters::ConnectionParameters, +}; +use snapshot_utils::{append_vec_iter, archived::ArchiveSnapshotExtractor, SnapshotExtractor}; +use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey}; + +use crate::rpc_server::RpcServerImpl; + +pub mod cli; +pub mod rpc_server; +pub mod snapshot_utils; + +#[tokio::main(worker_threads = 2)] +async fn main() { + env_logger::init_from_env( + env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, "info"), + ); + + let args = Args::parse(); + println!("tester args : {:?}", args); + + let Args { + snapshot_archive_path, + quic_url, + } = args; + + let token_account_storage = Arc::new(InmemoryTokenAccountStorage::default()); + let token_storage: Arc = + Arc::new(TokenProgramAccountsStorage::new(token_account_storage)); + + // fill from quic geyser stream + if let Some(quic_url) = quic_url { + let token_storage = token_storage.clone(); + tokio::spawn(async move { + let (quic_client, mut reciever, _jh) = + quic_geyser_client::non_blocking::client::Client::new( + quic_url, + ConnectionParameters::default(), + ) + .await + .unwrap(); + quic_client + .subscribe(vec![Filter::AccountsAll, Filter::Slot]) + .await + .unwrap(); + while let Some(message) = reciever.recv().await { + match message { + Message::AccountMsg(account) => { + let compression_method = match account.compression_type { + quic_geyser_common::compression::CompressionType::None => { + CompressionMethod::None + } + quic_geyser_common::compression::CompressionType::Lz4Fast(v) => { + CompressionMethod::Lz4(v) + } + quic_geyser_common::compression::CompressionType::Lz4(v) => { + CompressionMethod::Lz4(v) + } + }; + let account_data = AccountData { + pubkey: account.pubkey, + account: Arc::new(Account { + lamports: account.lamports, + data: Data::new(&account.data, compression_method), + owner: account.owner, + executable: account.executable, + rent_epoch: account.rent_epoch, + }), + updated_slot: account.slot_identifier.slot, + write_version: account.write_version, + }; + token_storage.update_account( + account_data, + lite_account_manager_common::commitment::Commitment::Processed, + ); + } + Message::SlotMsg(slot_msg) => { + if slot_msg.commitment_config == CommitmentConfig::confirmed() + || slot_msg.commitment_config == CommitmentConfig::finalized() + { + let commitment = + if slot_msg.commitment_config == CommitmentConfig::confirmed() { + Commitment::Confirmed + } else { + Commitment::Finalized + }; + token_storage.process_slot_data( + SlotInfo { + slot: slot_msg.slot, + parent: slot_msg.parent, + root: 0, + }, + commitment, + ); + } + } + _ => { + //not supported + } + } + } + + println!("stopping geyser stream"); + log::error!("stopping geyser stream"); + }); + } + + // load accounts from snapshot + let bk = { + let token_storage = token_storage.clone(); + let token_program = + Pubkey::from_str("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA").unwrap(); + tokio::task::spawn_blocking(move || { + let archive_path = PathBuf::from_str(snapshot_archive_path.as_str()).unwrap(); + + let mut loader: ArchiveSnapshotExtractor = + ArchiveSnapshotExtractor::open(&archive_path).unwrap(); + for vec in loader.iter() { + let append_vec = vec.unwrap(); + // info!("size: {:?}", append_vec.len()); + for handle in append_vec_iter(&append_vec) { + let stored = handle.access().unwrap(); + if stored.account_meta.owner != token_program { + continue; + } + + let data = stored.data; + let compressed_data = Data::new(data, CompressionMethod::None); + token_storage.initilize_or_update_account(AccountData { + pubkey: stored.meta.pubkey, + account: Arc::new(Account { + lamports: stored.account_meta.lamports, + data: compressed_data, + owner: stored.account_meta.owner, + executable: stored.account_meta.executable, + rent_epoch: stored.account_meta.rent_epoch, + }), + updated_slot: 0, + write_version: 0, + }); + } + } + }) + }; + // await for loading of snapshot to finish + bk.await.unwrap(); + + log::info!("Storage Initialized with snapshot"); + let rpc_server = RpcServerImpl::new(token_storage); + let jh = RpcServerImpl::start_serving(rpc_server, 10700) + .await + .unwrap(); + let _ = jh.await; +} diff --git a/simulate_from_snapshot/src/rpc_server.rs b/simulate_from_snapshot/src/rpc_server.rs new file mode 100644 index 0000000..45cbc69 --- /dev/null +++ b/simulate_from_snapshot/src/rpc_server.rs @@ -0,0 +1,232 @@ +use std::str::FromStr; +use std::sync::Arc; +use std::time::Duration; + +use base64::Engine; +use itertools::Itertools; +use jsonrpsee::server::ServerBuilder; +use jsonrpsee::{core::RpcResult, proc_macros::rpc}; +use lite_account_manager_common::account_filter::AccountFilterType as AmAccountFilterType; +use lite_account_manager_common::account_store_interface::AccountStorageInterface; +use lite_account_manager_common::{account_data::AccountData, commitment::Commitment}; +use solana_account_decoder::UiAccount; +use solana_rpc_client_api::client_error::reqwest::Method; +use solana_rpc_client_api::{ + config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}, + response::{OptionalContext, Response as RpcResponse, RpcKeyedAccount, RpcResponseContext}, +}; +use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey}; +use tokio::task::JoinHandle; +use tower_http::cors::{Any, CorsLayer}; + +#[rpc(server)] +pub trait TestRpc { + #[method(name = "getProgramAccounts")] + async fn get_program_accounts( + &self, + program_id_str: String, + config: Option, + ) -> RpcResult>>; + + #[method(name = "getSnapshot")] + async fn get_snapshot(&self, program_id_str: String) -> RpcResult; + + #[method(name = "getAccountInfo")] + async fn get_account_info( + &self, + pubkey_str: String, + config: Option, + ) -> RpcResult>>; +} + +pub struct RpcServerImpl { + storage: Arc, +} + +impl RpcServerImpl { + pub fn new(storage: Arc) -> Self { + Self { storage } + } + + pub async fn start_serving( + rpc_impl: RpcServerImpl, + port: u16, + ) -> anyhow::Result> { + let http_addr = format!("[::]:{port}"); + let cors = CorsLayer::new() + .max_age(Duration::from_secs(86400)) + // Allow `POST` when accessing the resource + .allow_methods([Method::POST, Method::GET, Method::OPTIONS]) + // Allow requests from any origin + .allow_origin(Any) + .allow_headers(Any); + + let middleware = tower::ServiceBuilder::new().layer(cors); + + let http_server_handle = ServerBuilder::default() + .set_middleware(middleware) + .max_connections(10) + .max_request_body_size(1024 * 1024) // 16 MB + .max_response_body_size(512 * 1024 * 1024) // 512 MBs + .http_only() + .build(http_addr.clone()) + .await? + .start(rpc_impl.into_rpc()); + + let jh = tokio::spawn(async move { + log::info!("HTTP Server started at {http_addr:?}"); + http_server_handle.stopped().await; + log::error!("QUIC GEYSER PLUGIN HTTP SERVER STOPPED"); + }); + Ok(jh) + } +} + +#[jsonrpsee::core::async_trait] +impl TestRpcServer for RpcServerImpl { + async fn get_program_accounts( + &self, + program_id_str: String, + config: Option, + ) -> RpcResult>> { + let Ok(program_id) = Pubkey::from_str(&program_id_str) else { + return Err(jsonrpsee::types::error::ErrorCode::InternalError.into()); + }; + let with_context = config + .as_ref() + .map(|value| value.with_context.unwrap_or_default()) + .unwrap_or_default(); + + let commitment: CommitmentConfig = config + .as_ref() + .and_then(|x| x.account_config.commitment) + .unwrap_or_default(); + + let account_filters = config + .as_ref() + .map(|x| { + x.filters + .as_ref() + .map(|filters| filters.iter().map(AmAccountFilterType::from).collect_vec()) + }) + .unwrap_or_default(); + + let commitment = Commitment::from(commitment); + + let gpa = self + .storage + .get_program_accounts(program_id, account_filters, commitment) + .map_err(|_| jsonrpsee::types::error::ErrorCode::InternalError)?; + + let min_context_slot = config + .as_ref() + .map(|c| { + if c.with_context.unwrap_or_default() { + c.account_config.min_context_slot + } else { + None + } + }) + .unwrap_or_default() + .unwrap_or_default(); + + let slot = gpa + .iter() + .map(|program_account| program_account.updated_slot) + .max() + .unwrap_or_default(); + let acc_config = config.map(|c| c.account_config); + + let rpc_keyed_accounts = gpa + .iter() + .filter_map(|account_data| { + if account_data.updated_slot >= min_context_slot { + Some(RpcKeyedAccount { + pubkey: account_data.pubkey.to_string(), + account: convert_account_data_to_ui_account( + account_data, + acc_config.clone(), + ), + }) + } else { + None + } + }) + .collect_vec(); + + if with_context { + Ok(OptionalContext::Context(RpcResponse { + context: RpcResponseContext { + slot, + api_version: None, + }, + value: rpc_keyed_accounts, + })) + } else { + Ok(OptionalContext::NoContext(rpc_keyed_accounts)) + } + } + + async fn get_snapshot(&self, program_id_str: String) -> RpcResult { + let program_id = Pubkey::from_str(program_id_str.as_str()) + .map_err(|_| jsonrpsee::types::error::ErrorCode::InvalidParams)?; + let res = self + .storage + .create_snapshot(program_id) + .map_err(|_| jsonrpsee::types::error::ErrorCode::InternalError)?; + Ok(base64::engine::general_purpose::STANDARD.encode(res)) + } + + async fn get_account_info( + &self, + pubkey_str: String, + config: Option, + ) -> RpcResult>> { + let account_pk = Pubkey::from_str(pubkey_str.as_str()) + .map_err(|_| jsonrpsee::types::error::ErrorCode::InvalidParams)?; + let commitment = config + .clone() + .and_then(|x| x.commitment) + .unwrap_or_default(); + let acc = self + .storage + .get_account(account_pk, Commitment::from(commitment)) + .map_err(|_| jsonrpsee::types::error::ErrorCode::InternalError)?; + + match acc { + Some(acc) => Ok(RpcResponse { + context: RpcResponseContext { + slot: acc.updated_slot, + api_version: None, + }, + value: Some(convert_account_data_to_ui_account(&acc, config)), + }), + None => Ok(RpcResponse { + context: RpcResponseContext { + slot: 0, + api_version: None, + }, + value: None, + }), + } + } +} + +pub fn convert_account_data_to_ui_account( + account_data: &AccountData, + config: Option, +) -> UiAccount { + let encoding = config + .as_ref() + .map(|c| c.encoding) + .unwrap_or_default() + .unwrap_or(solana_account_decoder::UiAccountEncoding::Base64); + let data_slice = config.as_ref().map(|c| c.data_slice).unwrap_or_default(); + UiAccount::encode( + &account_data.pubkey, + &account_data.account.to_solana_account(), + encoding, + None, + data_slice, + ) +} diff --git a/simulate_from_snapshot/src/snapshot_utils/append_vec.rs b/simulate_from_snapshot/src/snapshot_utils/append_vec.rs new file mode 100644 index 0000000..53585e6 --- /dev/null +++ b/simulate_from_snapshot/src/snapshot_utils/append_vec.rs @@ -0,0 +1,229 @@ +// Copyright 2022 Solana Foundation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This file contains code vendored from https://github.com/solana-labs/solana +// Source: solana/runtime/src/append_vec.rs + +use { + log::*, + memmap2::{Mmap, MmapMut}, + solana_accounts_db::{ + account_storage::meta::{AccountMeta, StoredMeta}, + accounts_file::ALIGN_BOUNDARY_OFFSET, + append_vec::MAXIMUM_APPEND_VEC_FILE_SIZE, + u64_align, + }, + solana_sdk::{ + account::{Account, AccountSharedData}, + hash::Hash, + }, + std::{ + convert::TryFrom, + fs::OpenOptions, + io::{self, Read}, + mem, + path::Path, + }, +}; + +/// References to account data stored elsewhere. Getting an `Account` requires cloning +/// (see `StoredAccountMeta::clone_account()`). +#[derive(PartialEq, Eq, Debug)] +pub struct StoredAccountMeta<'a> { + pub meta: &'a StoredMeta, + /// account data + pub account_meta: &'a AccountMeta, + pub data: &'a [u8], + pub offset: usize, + pub stored_size: usize, + pub hash: &'a Hash, +} + +impl<'a> StoredAccountMeta<'a> { + /// Return a new Account by copying all the data referenced by the `StoredAccountMeta`. + pub fn clone_account(&self) -> AccountSharedData { + AccountSharedData::from(Account { + lamports: self.account_meta.lamports, + owner: self.account_meta.owner, + executable: self.account_meta.executable, + rent_epoch: self.account_meta.rent_epoch, + data: self.data.to_vec(), + }) + } +} + +/// A thread-safe, file-backed block of memory used to store `Account` instances. Append operations +/// are serialized such that only one thread updates the internal `append_lock` at a time. No +/// restrictions are placed on reading. That is, one may read items from one thread while another +/// is appending new items. +pub struct AppendVec { + /// A file-backed block of memory that is used to store the data for each appended item. + map: Mmap, + + /// The number of bytes used to store items, not the number of items. + current_len: usize, + + /// The number of bytes available for storing items. + file_size: u64, + + slot: u64, +} + +impl AppendVec { + fn sanitize_len_and_size(current_len: usize, file_size: usize) -> io::Result<()> { + if file_size == 0 { + Err(std::io::Error::new( + std::io::ErrorKind::Other, + format!("too small file size {} for AppendVec", file_size), + )) + } else if usize::try_from(MAXIMUM_APPEND_VEC_FILE_SIZE) + .map(|max| file_size > max) + .unwrap_or(true) + { + Err(std::io::Error::new( + std::io::ErrorKind::Other, + format!("too large file size {} for AppendVec", file_size), + )) + } else if current_len > file_size { + Err(std::io::Error::new( + std::io::ErrorKind::Other, + format!("current_len is larger than file size ({})", file_size), + )) + } else { + Ok(()) + } + } + + /// how many more bytes can be stored in this append vec + pub const fn remaining_bytes(&self) -> u64 { + (self.capacity()).saturating_sub(self.len() as u64) + } + + pub const fn len(&self) -> usize { + self.current_len + } + + pub const fn is_empty(&self) -> bool { + self.len() == 0 + } + + pub const fn capacity(&self) -> u64 { + self.file_size + } + + pub fn new_from_file>( + path: P, + current_len: usize, + slot: u64, + ) -> io::Result { + let data = OpenOptions::new() + .read(true) + .write(false) + .create(false) + .open(&path)?; + + let file_size = std::fs::metadata(&path)?.len(); + AppendVec::sanitize_len_and_size(current_len, file_size as usize)?; + + let map = unsafe { + let result = Mmap::map(&data); + if result.is_err() { + // for vm.max_map_count, error is: {code: 12, kind: Other, message: "Cannot allocate memory"} + info!("memory map error: {:?}. This may be because vm.max_map_count is not set correctly.", result); + } + result? + }; + + let new = AppendVec { + map, + current_len, + file_size, + slot, + }; + + Ok(new) + } + + pub fn new_from_reader( + reader: &mut R, + current_len: usize, + slot: u64, + ) -> io::Result { + let mut map = MmapMut::map_anon(current_len)?; + io::copy(&mut reader.take(current_len as u64), &mut map.as_mut())?; + Ok(AppendVec { + map: map.make_read_only()?, + current_len, + file_size: current_len as u64, + slot, + }) + } + + /// Get a reference to the data at `offset` of `size` bytes if that slice + /// doesn't overrun the internal buffer. Otherwise return None. + /// Also return the offset of the first byte after the requested data that + /// falls on a 64-byte boundary. + fn get_slice(&self, offset: usize, size: usize) -> Option<(&[u8], usize)> { + let (next, overflow) = offset.overflowing_add(size); + if overflow || next > self.len() { + return None; + } + let data = &self.map[offset..next]; + let next = u64_align!(next); + + Some(( + //UNSAFE: This unsafe creates a slice that represents a chunk of self.map memory + //The lifetime of this slice is tied to &self, since it points to self.map memory + unsafe { std::slice::from_raw_parts(data.as_ptr(), size) }, + next, + )) + } + + /// Return a reference to the type at `offset` if its data doesn't overrun the internal buffer. + /// Otherwise return None. Also return the offset of the first byte after the requested data + /// that falls on a 64-byte boundary. + fn get_type<'a, T>(&self, offset: usize) -> Option<(&'a T, usize)> { + let (data, next) = self.get_slice(offset, mem::size_of::())?; + let ptr: *const T = data.as_ptr() as *const T; + //UNSAFE: The cast is safe because the slice is aligned and fits into the memory + //and the lifetime of the &T is tied to self, which holds the underlying memory map + Some((unsafe { &*ptr }, next)) + } + + /// Return account metadata for the account at `offset` if its data doesn't overrun + /// the internal buffer. Otherwise return None. Also return the offset of the first byte + /// after the requested data that falls on a 64-byte boundary. + pub fn get_account<'a>(&'a self, offset: usize) -> Option<(StoredAccountMeta<'a>, usize)> { + let (meta, next): (&'a StoredMeta, _) = self.get_type(offset)?; + let (account_meta, next): (&'a AccountMeta, _) = self.get_type(next)?; + let (hash, next): (&'a Hash, _) = self.get_type(next)?; + let (data, next) = self.get_slice(next, meta.data_len as usize)?; + let stored_size = next - offset; + Some(( + StoredAccountMeta { + meta, + account_meta, + data, + offset, + stored_size, + hash, + }, + next, + )) + } + + pub const fn slot(&self) -> u64 { + self.slot + } +} diff --git a/simulate_from_snapshot/src/snapshot_utils/archived.rs b/simulate_from_snapshot/src/snapshot_utils/archived.rs new file mode 100644 index 0000000..abdb2b2 --- /dev/null +++ b/simulate_from_snapshot/src/snapshot_utils/archived.rs @@ -0,0 +1,179 @@ +use { + crate::snapshot_utils::{ + deserialize_from, parse_append_vec_name, AccountsDbFields, AppendVec, AppendVecIterator, + DeserializableVersionedBank, SerializableAccountStorageEntry, SnapshotError, + SnapshotExtractor, SnapshotResult, + }, + log::info, + std::{ + fs::File, + io::{BufReader, Read}, + path::{Component, Path}, + pin::Pin, + time::Instant, + }, + tar::{Archive, Entries, Entry}, +}; + +/// Extracts account data from a .tar.zst stream. +pub struct ArchiveSnapshotExtractor +where + Source: Read + Unpin + 'static, +{ + accounts_db_fields: AccountsDbFields, + _archive: Pin>>>>, + entries: Option>>>, +} + +impl SnapshotExtractor for ArchiveSnapshotExtractor +where + Source: Read + Unpin + 'static, +{ + fn iter(&mut self) -> AppendVecIterator<'_> { + Box::new(self.unboxed_iter()) + } +} + +impl ArchiveSnapshotExtractor +where + Source: Read + Unpin + 'static, +{ + pub fn from_reader(source: Source) -> SnapshotResult { + let tar_stream = zstd::stream::read::Decoder::new(source)?; + let mut archive = Box::pin(Archive::new(tar_stream)); + + // This is safe as long as we guarantee that entries never gets accessed past drop. + let archive_static = unsafe { &mut *((&mut *archive) as *mut Archive<_>) }; + let mut entries = archive_static.entries()?; + + // Search for snapshot manifest. + let mut snapshot_file: Option> = None; + for entry in entries.by_ref() { + let entry = entry?; + let path = entry.path()?; + if Self::is_snapshot_manifest_file(&path) { + snapshot_file = Some(entry); + break; + } else if Self::is_appendvec_file(&path) { + // TODO Support archives where AppendVecs precede snapshot manifests + return Err(SnapshotError::UnexpectedAppendVec); + } + } + let snapshot_file = snapshot_file.ok_or(SnapshotError::NoSnapshotManifest)?; + //let snapshot_file_len = snapshot_file.size(); + let snapshot_file_path = snapshot_file.path()?.as_ref().to_path_buf(); + + info!("Opening snapshot manifest: {:?}", &snapshot_file_path); + let mut snapshot_file = BufReader::new(snapshot_file); + + let pre_unpack = Instant::now(); + let versioned_bank: DeserializableVersionedBank = deserialize_from(&mut snapshot_file)?; + drop(versioned_bank); + let versioned_bank_post_time = Instant::now(); + + let accounts_db_fields: AccountsDbFields = + deserialize_from(&mut snapshot_file)?; + let accounts_db_fields_post_time = Instant::now(); + drop(snapshot_file); + + info!( + "Read bank fields in {:?}", + versioned_bank_post_time - pre_unpack + ); + info!( + "Read accounts DB fields in {:?}", + accounts_db_fields_post_time - versioned_bank_post_time + ); + + Ok(ArchiveSnapshotExtractor { + _archive: archive, + accounts_db_fields, + entries: Some(entries), + }) + } + + fn unboxed_iter(&mut self) -> impl Iterator> + '_ { + self.entries + .take() + .into_iter() + .flatten() + .filter_map(|entry| { + let mut entry = match entry { + Ok(x) => x, + Err(e) => return Some(Err(e.into())), + }; + let path = match entry.path() { + Ok(x) => x, + Err(e) => return Some(Err(e.into())), + }; + let (slot, id) = path.file_name().and_then(parse_append_vec_name)?; + Some(self.process_entry(&mut entry, slot, id)) + }) + } + + fn process_entry( + &self, + entry: &mut Entry<'static, zstd::Decoder<'static, BufReader>>, + slot: u64, + id: u64, + ) -> SnapshotResult { + let known_vecs = self + .accounts_db_fields + .0 + .get(&slot) + .map(|v| &v[..]) + .unwrap_or(&[]); + let known_vec = known_vecs.iter().find(|entry| entry.id == (id as usize)); + let known_vec = match known_vec { + None => return Err(SnapshotError::UnexpectedAppendVec), + Some(v) => v, + }; + Ok(AppendVec::new_from_reader( + entry, + known_vec.accounts_current_len, + slot, + )?) + } + + fn is_snapshot_manifest_file(path: &Path) -> bool { + let mut components = path.components(); + if components.next() != Some(Component::Normal("snapshots".as_ref())) { + return false; + } + let slot_number_str_1 = match components.next() { + Some(Component::Normal(slot)) => slot, + _ => return false, + }; + // Check if slot number file is valid u64. + if slot_number_str_1 + .to_str() + .and_then(|s| s.parse::().ok()) + .is_none() + { + return false; + } + let slot_number_str_2 = match components.next() { + Some(Component::Normal(slot)) => slot, + _ => return false, + }; + components.next().is_none() && slot_number_str_1 == slot_number_str_2 + } + + fn is_appendvec_file(path: &Path) -> bool { + let mut components = path.components(); + if components.next() != Some(Component::Normal("accounts".as_ref())) { + return false; + } + let name = match components.next() { + Some(Component::Normal(c)) => c, + _ => return false, + }; + components.next().is_none() && parse_append_vec_name(name).is_some() + } +} + +impl ArchiveSnapshotExtractor { + pub fn open(path: &Path) -> SnapshotResult { + Self::from_reader(File::open(path)?) + } +} diff --git a/simulate_from_snapshot/src/snapshot_utils/mod.rs b/simulate_from_snapshot/src/snapshot_utils/mod.rs new file mode 100644 index 0000000..1602484 --- /dev/null +++ b/simulate_from_snapshot/src/snapshot_utils/mod.rs @@ -0,0 +1,104 @@ +use { + crate::{ + snapshot_utils::append_vec::{AppendVec, StoredAccountMeta}, + snapshot_utils::solana::{ + deserialize_from, AccountsDbFields, DeserializableVersionedBank, + SerializableAccountStorageEntry, + }, + }, + std::{ffi::OsStr, io::Read, path::Path, str::FromStr}, + thiserror::Error, +}; + +pub mod append_vec; +pub mod archived; +pub mod parallel; +pub mod solana; +pub mod unpacked; + +const SNAPSHOTS_DIR: &str = "snapshots"; + +#[derive(Error, Debug)] +pub enum SnapshotError { + #[error("{0}")] + IOError(#[from] std::io::Error), + #[error("Failed to deserialize: {0}")] + BincodeError(#[from] bincode::Error), + #[error("Missing status cache")] + NoStatusCache, + #[error("No snapshot manifest file found")] + NoSnapshotManifest, + #[error("Unexpected AppendVec")] + UnexpectedAppendVec, + #[error("Failed to create read progress tracking: {0}")] + ReadProgressTracking(String), +} + +pub type SnapshotResult = Result; + +pub type AppendVecIterator<'a> = Box> + 'a>; + +pub trait SnapshotExtractor: Sized { + fn iter(&mut self) -> AppendVecIterator<'_>; +} + +fn parse_append_vec_name(name: &OsStr) -> Option<(u64, u64)> { + let name = name.to_str()?; + let mut parts = name.splitn(2, '.'); + let slot = u64::from_str(parts.next().unwrap_or("")); + let id = u64::from_str(parts.next().unwrap_or("")); + match (slot, id) { + (Ok(slot), Ok(version)) => Some((slot, version)), + _ => None, + } +} + +pub fn append_vec_iter(append_vec: &AppendVec) -> impl Iterator { + let mut offset = 0usize; + std::iter::repeat_with(move || { + append_vec.get_account(offset).map(|(_, next_offset)| { + let account = StoredAccountMetaHandle::new(append_vec, offset); + offset = next_offset; + account + }) + }) + .take_while(|account| account.is_some()) + .flatten() +} + +pub struct StoredAccountMetaHandle<'a> { + append_vec: &'a AppendVec, + offset: usize, +} + +impl<'a> StoredAccountMetaHandle<'a> { + pub const fn new(append_vec: &'a AppendVec, offset: usize) -> StoredAccountMetaHandle { + Self { append_vec, offset } + } + + pub fn access(&self) -> Option> { + Some(self.append_vec.get_account(self.offset)?.0) + } +} + +pub trait ReadProgressTracking { + fn new_read_progress_tracker( + &self, + path: &Path, + rd: Box, + file_len: u64, + ) -> SnapshotResult>; +} + +struct NoopReadProgressTracking {} + +impl ReadProgressTracking for NoopReadProgressTracking { + fn new_read_progress_tracker( + &self, + _path: &Path, + rd: Box, + _file_len: u64, + ) -> SnapshotResult> { + Ok(rd) + } +} diff --git a/simulate_from_snapshot/src/snapshot_utils/parallel.rs b/simulate_from_snapshot/src/snapshot_utils/parallel.rs new file mode 100644 index 0000000..26e76be --- /dev/null +++ b/simulate_from_snapshot/src/snapshot_utils/parallel.rs @@ -0,0 +1,38 @@ +use { + crate::snapshot_utils::{AppendVec, AppendVecIterator}, + tokio::task::JoinSet, +}; + +#[async_trait::async_trait] +pub trait AppendVecConsumer { + async fn on_append_vec(&mut self, append_vec: AppendVec) -> anyhow::Result<()>; +} + +pub async fn par_iter_append_vecs( + iterator: AppendVecIterator<'_>, + create_consumer: F, + num_threads: usize, +) -> anyhow::Result<()> +where + F: Fn() -> A, + A: AppendVecConsumer + Send + 'static, +{ + let mut tasks = JoinSet::new(); + for append_vec in iterator { + let mut consumer = if tasks.len() >= num_threads { + tasks.join_next().await.expect("checked")?? + } else { + create_consumer() + }; + + tasks.spawn(async move { + consumer.on_append_vec(append_vec?).await?; + Ok::<_, anyhow::Error>(consumer) + }); + } + while let Some(result) = tasks.join_next().await { + result??; + } + + Ok(()) +} diff --git a/simulate_from_snapshot/src/snapshot_utils/solana.rs b/simulate_from_snapshot/src/snapshot_utils/solana.rs new file mode 100644 index 0000000..03746d2 --- /dev/null +++ b/simulate_from_snapshot/src/snapshot_utils/solana.rs @@ -0,0 +1,127 @@ +// Copyright 2022 Solana Foundation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This file contains code vendored from https://github.com/solana-labs/solana + +use bincode::Options; +use serde::de::DeserializeOwned; +use serde::{Deserialize, Serialize}; +use solana_accounts_db::account_storage::meta::StoredMetaWriteVersion; +use solana_accounts_db::accounts_db::BankHashStats; +use solana_accounts_db::ancestors::AncestorsForSerialization; +use solana_accounts_db::blockhash_queue::BlockhashQueue; +use solana_accounts_db::rent_collector::RentCollector; +use solana_frozen_abi_macro::AbiExample; +use solana_runtime::epoch_stakes::EpochStakes; +use solana_runtime::stakes::Stakes; +use solana_sdk::clock::{Epoch, UnixTimestamp}; +use solana_sdk::deserialize_utils::default_on_eof; +use solana_sdk::epoch_schedule::EpochSchedule; +use solana_sdk::fee_calculator::{FeeCalculator, FeeRateGovernor}; +use solana_sdk::hard_forks::HardForks; +use solana_sdk::hash::Hash; +use solana_sdk::inflation::Inflation; +use solana_sdk::pubkey::Pubkey; +use solana_sdk::slot_history::Slot; +use solana_sdk::stake::state::Delegation; +use std::collections::{HashMap, HashSet}; +use std::io::Read; + +const MAX_STREAM_SIZE: u64 = 32 * 1024 * 1024 * 1024; + +pub fn deserialize_from(reader: R) -> bincode::Result +where + R: Read, + T: DeserializeOwned, +{ + bincode::options() + .with_limit(MAX_STREAM_SIZE) + .with_fixint_encoding() + .allow_trailing_bytes() + .deserialize_from::(reader) +} + +#[derive(Default, PartialEq, Eq, Debug, Deserialize)] +struct UnusedAccounts { + unused1: HashSet, + unused2: HashSet, + unused3: HashMap, +} + +#[derive(Deserialize)] +#[allow(dead_code)] +pub struct DeserializableVersionedBank { + pub blockhash_queue: BlockhashQueue, + pub ancestors: AncestorsForSerialization, + pub hash: Hash, + pub parent_hash: Hash, + pub parent_slot: Slot, + pub hard_forks: HardForks, + pub transaction_count: u64, + pub tick_height: u64, + pub signature_count: u64, + pub capitalization: u64, + pub max_tick_height: u64, + pub hashes_per_tick: Option, + pub ticks_per_slot: u64, + pub ns_per_slot: u128, + pub genesis_creation_time: UnixTimestamp, + pub slots_per_year: f64, + pub accounts_data_len: u64, + pub slot: Slot, + pub epoch: Epoch, + pub block_height: u64, + pub collector_id: Pubkey, + pub collector_fees: u64, + pub fee_calculator: FeeCalculator, + pub fee_rate_governor: FeeRateGovernor, + pub collected_rent: u64, + pub rent_collector: RentCollector, + pub epoch_schedule: EpochSchedule, + pub inflation: Inflation, + pub stakes: Stakes, + #[allow(dead_code)] + unused_accounts: UnusedAccounts, + pub epoch_stakes: HashMap, + pub is_delta: bool, +} + +#[derive(Clone, Default, Debug, Serialize, Deserialize, PartialEq, Eq, AbiExample)] +pub struct BankHashInfo { + pub hash: Hash, + pub snapshot_hash: Hash, + pub stats: BankHashStats, +} + +#[derive(Clone, Debug, Default, Deserialize, PartialEq)] +pub struct AccountsDbFields( + pub HashMap>, + pub StoredMetaWriteVersion, + pub Slot, + pub BankHashInfo, + /// all slots that were roots within the last epoch + #[serde(deserialize_with = "default_on_eof")] + pub Vec, + /// slots that were roots within the last epoch for which we care about the hash value + #[serde(deserialize_with = "default_on_eof")] + pub Vec<(Slot, Hash)>, +); + +pub type SerializedAppendVecId = usize; + +#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Deserialize)] +pub struct SerializableAccountStorageEntry { + pub id: SerializedAppendVecId, + pub accounts_current_len: usize, +} diff --git a/simulate_from_snapshot/src/snapshot_utils/unpacked.rs b/simulate_from_snapshot/src/snapshot_utils/unpacked.rs new file mode 100644 index 0000000..780e9b9 --- /dev/null +++ b/simulate_from_snapshot/src/snapshot_utils/unpacked.rs @@ -0,0 +1,125 @@ +use { + crate::snapshot_utils::{ + deserialize_from, parse_append_vec_name, AccountsDbFields, AppendVec, AppendVecIterator, + DeserializableVersionedBank, ReadProgressTracking, SerializableAccountStorageEntry, + SnapshotError, SnapshotExtractor, SnapshotResult, SNAPSHOTS_DIR, + }, + itertools::Itertools, + log::info, + solana_runtime::snapshot_utils::SNAPSHOT_STATUS_CACHE_FILENAME, + std::{ + fs::OpenOptions, + io::BufReader, + path::{Path, PathBuf}, + str::FromStr, + time::Instant, + }, +}; + +/// Extracts account data from snapshots that were unarchived to a file system. +pub struct UnpackedSnapshotExtractor { + root: PathBuf, + accounts_db_fields: AccountsDbFields, +} + +impl SnapshotExtractor for UnpackedSnapshotExtractor { + fn iter(&mut self) -> AppendVecIterator<'_> { + Box::new(self.unboxed_iter()) + } +} + +impl UnpackedSnapshotExtractor { + pub fn open( + path: &Path, + progress_tracking: Box, + ) -> SnapshotResult { + let snapshots_dir = path.join(SNAPSHOTS_DIR); + let status_cache = snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME); + if !status_cache.is_file() { + return Err(SnapshotError::NoStatusCache); + } + + let snapshot_files = snapshots_dir.read_dir()?; + + let snapshot_file_path = snapshot_files + .filter_map(|entry| entry.ok()) + .find(|entry| u64::from_str(&entry.file_name().to_string_lossy()).is_ok()) + .map(|entry| entry.path().join(entry.file_name())) + .ok_or(SnapshotError::NoSnapshotManifest)?; + + info!("Opening snapshot manifest: {:?}", snapshot_file_path); + let snapshot_file = OpenOptions::new().read(true).open(&snapshot_file_path)?; + let snapshot_file_len = snapshot_file.metadata()?.len(); + + let snapshot_file = progress_tracking.new_read_progress_tracker( + &snapshot_file_path, + Box::new(snapshot_file), + snapshot_file_len, + )?; + let mut snapshot_file = BufReader::new(snapshot_file); + + let pre_unpack = Instant::now(); + let versioned_bank: DeserializableVersionedBank = deserialize_from(&mut snapshot_file)?; + drop(versioned_bank); + let versioned_bank_post_time = Instant::now(); + + let accounts_db_fields: AccountsDbFields = + deserialize_from(&mut snapshot_file)?; + let accounts_db_fields_post_time = Instant::now(); + drop(snapshot_file); + + info!( + "Read bank fields in {:?}", + versioned_bank_post_time - pre_unpack + ); + info!( + "Read accounts DB fields in {:?}", + accounts_db_fields_post_time - versioned_bank_post_time + ); + + Ok(UnpackedSnapshotExtractor { + root: path.to_path_buf(), + accounts_db_fields, + }) + } + + pub fn unboxed_iter(&self) -> impl Iterator> + '_ { + std::iter::once(self.iter_streams()) + .flatten_ok() + .flatten_ok() + } + + fn iter_streams(&self) -> SnapshotResult> + '_> { + let accounts_dir = self.root.join("accounts"); + Ok(accounts_dir + .read_dir()? + .filter_map(|f| f.ok()) + .filter_map(|f| { + let name = f.file_name(); + parse_append_vec_name(&f.file_name()).map(move |parsed| (parsed, name)) + }) + .map(move |((slot, version), name)| { + self.open_append_vec(slot, version, &accounts_dir.join(name)) + })) + } + + fn open_append_vec(&self, slot: u64, id: u64, path: &Path) -> SnapshotResult { + let known_vecs = self + .accounts_db_fields + .0 + .get(&slot) + .map(|v| &v[..]) + .unwrap_or(&[]); + let known_vec = known_vecs.iter().find(|entry| entry.id == (id as usize)); + let known_vec = match known_vec { + None => return Err(SnapshotError::UnexpectedAppendVec), + Some(v) => v, + }; + + Ok(AppendVec::new_from_file( + path, + known_vec.accounts_current_len, + slot, + )?) + } +} diff --git a/token_account_storage/src/inmemory_token_storage.rs b/token_account_storage/src/inmemory_token_storage.rs index 496df50..7db27a7 100644 --- a/token_account_storage/src/inmemory_token_storage.rs +++ b/token_account_storage/src/inmemory_token_storage.rs @@ -602,7 +602,11 @@ impl AccountStorageInterface for TokenProgramAccountsStorage { ) { Ok(res) => res, Err(e) => { - log::error!("Token program account {} was not able to identified {}", account_data.pubkey.to_string(), e); + log::error!( + "Token program account {} was not able to identified {}", + account_data.pubkey.to_string(), + e + ); return; } }; @@ -624,7 +628,7 @@ impl AccountStorageInterface for TokenProgramAccountsStorage { commitment, self.confirmed_slot.load(Ordering::Relaxed), ) - .get(0) + .first() .unwrap() { Some((processed_account, slot)) => Ok(token_program_account_to_solana_account( diff --git a/token_account_storage/src/token_program_utils.rs b/token_account_storage/src/token_program_utils.rs index dc19515..f69370f 100644 --- a/token_account_storage/src/token_program_utils.rs +++ b/token_account_storage/src/token_program_utils.rs @@ -94,7 +94,9 @@ pub fn get_token_program_account_type( match type_account { 0 => { //mint - let mint = spl_token_2022::state::Mint::unpack_unchecked(&account_data.account.data.data())?; + let mint = spl_token_2022::state::Mint::unpack_unchecked( + &account_data.account.data.data(), + )?; Ok(TokenProgramAccountType::Mint(MintAccount { program: crate::account_types::Program::Token2022Program, pubkey: account_data.pubkey, @@ -108,8 +110,9 @@ pub fn get_token_program_account_type( })) } 1 => { - let token_account = - spl_token_2022::state::Account::unpack_unchecked(&account_data.account.data.data())?; + let token_account = spl_token_2022::state::Account::unpack_unchecked( + &account_data.account.data.data(), + )?; let mint_index = get_or_create_mint_index( token_account.mint, mint_index_by_pubkey, @@ -143,8 +146,9 @@ pub fn get_token_program_account_type( })) } 2 => { - let multi_sig = - spl_token_2022::state::Multisig::unpack_unchecked(&account_data.account.data.data())?; + let multi_sig = spl_token_2022::state::Multisig::unpack_unchecked( + &account_data.account.data.data(), + )?; Ok(TokenProgramAccountType::MultiSig( MultiSig { program: crate::account_types::Program::Token2022Program, @@ -215,7 +219,8 @@ pub fn get_token_program_account_type( })) } else { // multisig - let multi_sig = spl_token::state::Multisig::unpack_unchecked(&account_data.account.data.data())?; + let multi_sig = + spl_token::state::Multisig::unpack_unchecked(&account_data.account.data.data())?; Ok(TokenProgramAccountType::MultiSig( MultiSig { program: crate::account_types::Program::TokenProgram,