Multi-version snapshot support (#9980)

* Multi-version snapshot support

* rustfmt

* Remove CLI options and runtime support for selection output snapshot version.
Address some clippy complaints.

* Muzzle clippy type complexity warning.

Despite clippy's suggestion, it is not currently possible to create type aliases
for traits and so everything within the 'Box<...>' cannot be type aliased.

This then leaves creating full blown traits, and either implementing
said traits by closure (somehow) or moving the closures into new structs
implementing said traits which seems a bit of a palaver.

Alternatively it is possible to define and use the type alias 'type ResultBox<T> = Result<Box<T>>'
which does seems rather pointless and not a great reduction in complexity but is enough to keep clippy happy.

In the end I simply went with squelching the clippy warning.

* Remove now unused Serialize/Deserialize trait implementations for AccountStorageEntry and AppendVec

* refactor versioned de/serialisers

* rename serde_utils to serde_snapshot

* move call to accounts_db.generate_index() back down to context_accountsdb_from_stream()

* update version 1.1.1 to 1.2.0
remove nested use of serialize_bytes

* cleanups

* Add back measurement of account storage entry serialization.
Remove construction of Vec and HashMap temporaries during serialization.

* consolidate serialisation test cases into serde_snapshot.
clean up leakage of implementation details in serde_snapshot.

* move short term / legacy snapshot code into child module

* add serialize_iter_as_tuple

* preliminary integration of following commit

commit 6d58b73c47294bfb93465d5a83cd2175660b6e6d
Author: Ryo Onodera <ryoqun@gmail.com>
Date:   Wed May 20 14:02:02 2020 +0900

    Confine snapshot 1.1 relic to versioned codepath

* refactored serde_snapshot, rustfmt
legacy accounts_db format now "owns" both leading u64s, legacy bank_rc format has none

* reduce type complexity (clippy)
This commit is contained in:
Kristofer Peterson 2020-05-22 18:54:24 +01:00 committed by GitHub
parent 967320a091
commit b7a32f01c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 1217 additions and 626 deletions

View File

@ -83,9 +83,7 @@ mod tests {
snapshot_package::AccountsPackage,
snapshot_utils::{self, SNAPSHOT_STATUS_CACHE_FILE_NAME},
};
use solana_runtime::{
accounts_db::AccountStorageEntry, bank::BankSlotDelta, bank::MAX_SNAPSHOT_DATA_FILE_SIZE,
};
use solana_runtime::{accounts_db::AccountStorageEntry, bank::BankSlotDelta};
use solana_sdk::hash::Hash;
use std::{
fs::{self, remove_dir_all, OpenOptions},
@ -186,7 +184,6 @@ mod tests {
let dummy_slot_deltas: Vec<BankSlotDelta> = vec![];
snapshot_utils::serialize_snapshot_data_file(
&snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILE_NAME),
MAX_SNAPSHOT_DATA_FILE_SIZE,
|stream| {
serialize_into(stream, &dummy_slot_deltas)?;
Ok(())

View File

@ -344,7 +344,6 @@ mod tests {
&saved_snapshots_dir
.path()
.join(snapshot_utils::SNAPSHOT_STATUS_CACHE_FILE_NAME),
solana_runtime::bank::MAX_SNAPSHOT_DATA_FILE_SIZE,
|stream| {
serialize_into(stream, &dummy_slot_deltas)?;
Ok(())

View File

@ -9,10 +9,9 @@ use log::*;
use regex::Regex;
use solana_measure::measure::Measure;
use solana_runtime::{
accounts_db::{SnapshotStorage, SnapshotStorages},
bank::{
self, deserialize_from_snapshot, Bank, BankRcSerialize, BankSlotDelta,
MAX_SNAPSHOT_DATA_FILE_SIZE,
bank::{Bank, BankSlotDelta},
serde_snapshot::{
bankrc_from_stream, bankrc_to_stream, SerdeStyle, SnapshotStorage, SnapshotStorages,
},
};
use solana_sdk::{clock::Slot, genesis_config::GenesisConfig, hash::Hash, pubkey::Pubkey};
@ -22,6 +21,8 @@ use std::{
io::{BufReader, BufWriter, Error as IOError, ErrorKind, Read, Seek, SeekFrom, Write},
path::{Path, PathBuf},
process::ExitStatus,
str::FromStr,
sync::Arc,
};
use tar::Archive;
use tempfile::TempDir;
@ -32,7 +33,47 @@ pub const TAR_SNAPSHOTS_DIR: &str = "snapshots";
pub const TAR_ACCOUNTS_DIR: &str = "accounts";
pub const TAR_VERSION_FILE: &str = "version";
pub const SNAPSHOT_VERSION: &str = "1.1.0";
const MAX_SNAPSHOT_DATA_FILE_SIZE: u64 = 32 * 1024 * 1024 * 1024; // 32 GiB
const VERSION_STRING_V1_1_0: &str = "1.1.0";
const VERSION_STRING_V1_2_0: &str = "1.2.0";
const OUTPUT_SNAPSHOT_VERSION: SnapshotVersion = SnapshotVersion::V1_2_0;
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub enum SnapshotVersion {
V1_1_0,
V1_2_0,
}
impl From<SnapshotVersion> for &'static str {
fn from(snapshot_version: SnapshotVersion) -> &'static str {
match snapshot_version {
SnapshotVersion::V1_1_0 => VERSION_STRING_V1_1_0,
SnapshotVersion::V1_2_0 => VERSION_STRING_V1_2_0,
}
}
}
impl FromStr for SnapshotVersion {
type Err = &'static str;
fn from_str(version_string: &str) -> std::result::Result<Self, Self::Err> {
match version_string {
VERSION_STRING_V1_1_0 => Ok(SnapshotVersion::V1_1_0),
VERSION_STRING_V1_2_0 => Ok(SnapshotVersion::V1_2_0),
_ => Err("unsupported snapshot version"),
}
}
}
impl SnapshotVersion {
pub fn as_str(self) -> &'static str {
<&str as From<Self>>::from(self)
}
fn maybe_from_string(version_string: &str) -> Option<SnapshotVersion> {
version_string.parse::<Self>().ok()
}
}
#[derive(PartialEq, Ord, Eq, Debug)]
pub struct SlotSnapshotPaths {
@ -192,7 +233,7 @@ pub fn archive_snapshot_package(snapshot_package: &AccountsPackage) -> Result<()
// Write version file
{
let mut f = std::fs::File::create(staging_version_file)?;
f.write_all(&SNAPSHOT_VERSION.to_string().into_bytes())?;
f.write_all(OUTPUT_SNAPSHOT_VERSION.as_str().as_bytes())?;
}
let (compression_option, file_ext) = get_compression_ext(&snapshot_package.compression);
@ -289,13 +330,35 @@ where
}
}
pub fn serialize_snapshot_data_file<F>(
pub fn serialize_snapshot_data_file<F>(data_file_path: &Path, serializer: F) -> Result<u64>
where
F: FnOnce(&mut BufWriter<File>) -> Result<()>,
{
serialize_snapshot_data_file_capped::<F>(
data_file_path,
MAX_SNAPSHOT_DATA_FILE_SIZE,
serializer,
)
}
pub fn deserialize_snapshot_data_file<F, T>(data_file_path: &Path, deserializer: F) -> Result<T>
where
F: FnOnce(&mut BufReader<File>) -> Result<T>,
{
deserialize_snapshot_data_file_capped::<F, T>(
data_file_path,
MAX_SNAPSHOT_DATA_FILE_SIZE,
deserializer,
)
}
fn serialize_snapshot_data_file_capped<F>(
data_file_path: &Path,
maximum_file_size: u64,
mut serializer: F,
serializer: F,
) -> Result<u64>
where
F: FnMut(&mut BufWriter<File>) -> Result<()>,
F: FnOnce(&mut BufWriter<File>) -> Result<()>,
{
let data_file = File::create(data_file_path)?;
let mut data_file_stream = BufWriter::new(data_file);
@ -313,13 +376,13 @@ where
Ok(consumed_size)
}
pub fn deserialize_snapshot_data_file<F, T>(
fn deserialize_snapshot_data_file_capped<F, T>(
data_file_path: &Path,
maximum_file_size: u64,
mut deserializer: F,
deserializer: F,
) -> Result<T>
where
F: FnMut(&mut BufReader<File>) -> Result<T>,
F: FnOnce(&mut BufReader<File>) -> Result<T>,
{
let file_size = fs::metadata(&data_file_path)?.len();
@ -367,21 +430,18 @@ pub fn add_snapshot<P: AsRef<Path>>(
);
let mut bank_serialize = Measure::start("bank-serialize-ms");
let consumed_size = serialize_snapshot_data_file(
&snapshot_bank_file_path,
MAX_SNAPSHOT_DATA_FILE_SIZE,
|stream| {
serialize_into(stream.by_ref(), &*bank)?;
serialize_into(
stream.by_ref(),
&BankRcSerialize {
bank_rc: &bank.rc,
snapshot_storages,
},
)?;
Ok(())
},
)?;
let bank_snapshot_serializer = move |stream: &mut BufWriter<File>| -> Result<()> {
serialize_into(stream.by_ref(), bank)?;
bankrc_to_stream(
SerdeStyle::NEWER,
stream.by_ref(),
&bank.rc,
snapshot_storages,
)?;
Ok(())
};
let consumed_size =
serialize_snapshot_data_file(&snapshot_bank_file_path, bank_snapshot_serializer)?;
bank_serialize.stop();
// Monitor sizes because they're capped to MAX_SNAPSHOT_DATA_FILE_SIZE
@ -414,14 +474,10 @@ pub fn serialize_status_cache(
snapshot_links.path().join(SNAPSHOT_STATUS_CACHE_FILE_NAME);
let mut status_cache_serialize = Measure::start("status_cache_serialize-ms");
let consumed_size = serialize_snapshot_data_file(
&snapshot_status_cache_file_path,
MAX_SNAPSHOT_DATA_FILE_SIZE,
|stream| {
serialize_into(stream, slot_deltas)?;
Ok(())
},
)?;
let consumed_size = serialize_snapshot_data_file(&snapshot_status_cache_file_path, |stream| {
serialize_into(stream, slot_deltas)?;
Ok(())
})?;
status_cache_serialize.stop();
// Monitor sizes because they're capped to MAX_SNAPSHOT_DATA_FILE_SIZE
@ -624,6 +680,13 @@ where
{
info!("snapshot version: {}", snapshot_version);
let snapshot_version_enum =
SnapshotVersion::maybe_from_string(snapshot_version).ok_or_else(|| {
get_io_error(&format!(
"unsupported snapshot version: {}",
snapshot_version
))
})?;
let mut snapshot_paths = get_snapshot_paths(&unpacked_snapshots_dir);
if snapshot_paths.len() > 1 {
return Err(get_io_error("invalid snapshot format"));
@ -633,45 +696,47 @@ where
.ok_or_else(|| get_io_error("No snapshots found in snapshots directory"))?;
info!("Loading bank from {:?}", &root_paths.snapshot_file_path);
let bank = deserialize_snapshot_data_file(
&root_paths.snapshot_file_path,
MAX_SNAPSHOT_DATA_FILE_SIZE,
|stream| {
let mut bank: Bank = match snapshot_version {
SNAPSHOT_VERSION => deserialize_from_snapshot(stream.by_ref())?,
_ => {
return Err(get_io_error(&format!(
"unsupported snapshot version: {}",
snapshot_version
)));
}
};
bank.operating_mode = Some(genesis_config.operating_mode);
info!("Rebuilding accounts...");
let rc = bank::BankRc::from_stream(
let bank = deserialize_snapshot_data_file(&root_paths.snapshot_file_path, |mut stream| {
let mut bank: Bank = bincode::config()
.limit(MAX_SNAPSHOT_DATA_FILE_SIZE)
.deserialize_from(&mut stream)?;
info!("Rebuilding accounts...");
let mut bankrc = match snapshot_version_enum {
SnapshotVersion::V1_1_0 => bankrc_from_stream(
SerdeStyle::OLDER,
account_paths,
bank.slot(),
&bank.ancestors,
frozen_account_pubkeys,
stream.by_ref(),
&mut stream,
&append_vecs_path,
)?;
bank.set_bank_rc(rc, bank::StatusCacheRc::default());
bank.finish_init();
Ok(bank)
},
)?;
),
SnapshotVersion::V1_2_0 => bankrc_from_stream(
SerdeStyle::NEWER,
account_paths,
bank.slot(),
&mut stream,
&append_vecs_path,
),
}?;
Arc::get_mut(&mut Arc::get_mut(&mut bankrc.accounts).unwrap().accounts_db)
.unwrap()
.freeze_accounts(&bank.ancestors, frozen_account_pubkeys);
bank.rc = bankrc;
bank.operating_mode = Some(genesis_config.operating_mode);
bank.finish_init();
Ok(bank)
})?;
let status_cache_path = unpacked_snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILE_NAME);
let slot_deltas = deserialize_snapshot_data_file(
&status_cache_path,
MAX_SNAPSHOT_DATA_FILE_SIZE,
|stream| {
info!("Rebuilding status cache...");
let slot_deltas: Vec<BankSlotDelta> = deserialize_from_snapshot(stream)?;
Ok(slot_deltas)
},
)?;
let slot_deltas = deserialize_snapshot_data_file(&status_cache_path, |stream| {
info!("Rebuilding status cache...");
let slot_deltas: Vec<BankSlotDelta> = bincode::config()
.limit(MAX_SNAPSHOT_DATA_FILE_SIZE)
.deserialize_from(stream)?;
Ok(slot_deltas)
})?;
bank.src.append(&slot_deltas);
@ -726,7 +791,7 @@ mod tests {
fn test_serialize_snapshot_data_file_under_limit() {
let temp_dir = tempfile::TempDir::new().unwrap();
let expected_consumed_size = size_of::<u32>() as u64;
let consumed_size = serialize_snapshot_data_file(
let consumed_size = serialize_snapshot_data_file_capped(
&temp_dir.path().join("data-file"),
expected_consumed_size,
|stream| {
@ -742,7 +807,7 @@ mod tests {
fn test_serialize_snapshot_data_file_over_limit() {
let temp_dir = tempfile::TempDir::new().unwrap();
let expected_consumed_size = size_of::<u32>() as u64;
let result = serialize_snapshot_data_file(
let result = serialize_snapshot_data_file_capped(
&temp_dir.path().join("data-file"),
expected_consumed_size - 1,
|stream| {
@ -759,7 +824,7 @@ mod tests {
let expected_consumed_size = size_of::<u32>() as u64;
let temp_dir = tempfile::TempDir::new().unwrap();
serialize_snapshot_data_file(
serialize_snapshot_data_file_capped(
&temp_dir.path().join("data-file"),
expected_consumed_size,
|stream| {
@ -769,7 +834,7 @@ mod tests {
)
.unwrap();
let actual_data = deserialize_snapshot_data_file(
let actual_data = deserialize_snapshot_data_file_capped(
&temp_dir.path().join("data-file"),
expected_consumed_size,
|stream| Ok(deserialize_from::<_, u32>(stream)?),
@ -784,7 +849,7 @@ mod tests {
let expected_consumed_size = size_of::<u32>() as u64;
let temp_dir = tempfile::TempDir::new().unwrap();
serialize_snapshot_data_file(
serialize_snapshot_data_file_capped(
&temp_dir.path().join("data-file"),
expected_consumed_size,
|stream| {
@ -794,7 +859,7 @@ mod tests {
)
.unwrap();
let result = deserialize_snapshot_data_file(
let result = deserialize_snapshot_data_file_capped(
&temp_dir.path().join("data-file"),
expected_consumed_size - 1,
|stream| Ok(deserialize_from::<_, u32>(stream)?),
@ -808,7 +873,7 @@ mod tests {
let expected_consumed_size = size_of::<u32>() as u64;
let temp_dir = tempfile::TempDir::new().unwrap();
serialize_snapshot_data_file(
serialize_snapshot_data_file_capped(
&temp_dir.path().join("data-file"),
expected_consumed_size * 2,
|stream| {
@ -819,7 +884,7 @@ mod tests {
)
.unwrap();
let result = deserialize_snapshot_data_file(
let result = deserialize_snapshot_data_file_capped(
&temp_dir.path().join("data-file"),
expected_consumed_size * 2,
|stream| Ok(deserialize_from::<_, u32>(stream)?),

View File

@ -374,7 +374,12 @@ mod bpf {
assert!(bank_client
.send_message(
&[&mint_keypair, &argument_keypair, &invoked_argument_keypair, &from_keypair],
&[
&mint_keypair,
&argument_keypair,
&invoked_argument_keypair,
&from_keypair
],
message,
)
.is_ok());

View File

@ -26,14 +26,13 @@ use solana_sdk::{
};
use std::{
collections::{HashMap, HashSet},
io::{BufReader, Error as IOError, Read},
ops::RangeBounds,
path::{Path, PathBuf},
path::PathBuf,
sync::{Arc, Mutex, RwLock},
};
#[derive(Default, Debug)]
struct ReadonlyLock {
pub(crate) struct ReadonlyLock {
lock_count: Mutex<u64>,
}
@ -47,10 +46,10 @@ pub struct Accounts {
pub accounts_db: Arc<AccountsDB>,
/// set of writable accounts which are currently in the pipeline
account_locks: Mutex<HashSet<Pubkey>>,
pub(crate) account_locks: Mutex<HashSet<Pubkey>>,
/// Set of read-only accounts which are currently in the pipeline, caching number of locks.
readonly_locks: Arc<RwLock<Option<HashMap<Pubkey, ReadonlyLock>>>>,
pub(crate) readonly_locks: Arc<RwLock<Option<HashMap<Pubkey, ReadonlyLock>>>>,
}
// for the load instructions
@ -66,6 +65,15 @@ pub enum AccountAddressFilter {
}
impl Accounts {
pub(crate) fn new_empty(accounts_db: AccountsDB) -> Self {
Self {
accounts_db: Arc::new(accounts_db),
account_locks: Mutex::new(HashSet::new()),
readonly_locks: Arc::new(RwLock::new(Some(HashMap::new()))),
..Self::default()
}
}
pub fn new(paths: Vec<PathBuf>) -> Self {
Self::new_with_frozen_accounts(paths, &HashMap::default(), &[])
}
@ -102,25 +110,6 @@ impl Accounts {
.freeze_accounts(ancestors, frozen_account_pubkeys);
}
pub fn from_stream<R: Read, P: AsRef<Path>>(
account_paths: &[PathBuf],
ancestors: &Ancestors,
frozen_account_pubkeys: &[Pubkey],
stream: &mut BufReader<R>,
stream_append_vecs_path: P,
) -> std::result::Result<Self, IOError> {
let mut accounts_db = AccountsDB::new(account_paths.to_vec());
accounts_db.accounts_from_stream(stream, stream_append_vecs_path)?;
accounts_db.freeze_accounts(ancestors, frozen_account_pubkeys);
Ok(Accounts {
slot: 0,
accounts_db: Arc::new(accounts_db),
account_locks: Mutex::new(HashSet::new()),
readonly_locks: Arc::new(RwLock::new(Some(HashMap::new()))),
})
}
/// Return true if the slice has any duplicate elements
pub fn has_duplicates<T: PartialEq>(xs: &[T]) -> bool {
// Note: This is an O(n^2) algorithm, but requires no heap allocations. The benchmark
@ -787,16 +776,7 @@ mod tests {
// TODO: all the bank tests are bank specific, issue: 2194
use super::*;
use crate::{
accounts_db::{
tests::copy_append_vecs,
{get_temp_accounts_paths, AccountsDBSerialize},
},
bank::HashAgeKind,
rent_collector::RentCollector,
};
use bincode::serialize_into;
use rand::{thread_rng, Rng};
use crate::{bank::HashAgeKind, rent_collector::RentCollector};
use solana_sdk::{
account::Account,
epoch_schedule::EpochSchedule,
@ -811,11 +791,9 @@ mod tests {
transaction::Transaction,
};
use std::{
io::Cursor,
sync::atomic::{AtomicBool, AtomicU64, Ordering},
{thread, time},
};
use tempfile::TempDir;
fn load_accounts_with_fee_and_rent(
tx: Transaction,
@ -1472,61 +1450,6 @@ mod tests {
accounts.bank_hash_at(1);
}
fn check_accounts(accounts: &Accounts, pubkeys: &[Pubkey], num: usize) {
for _ in 1..num {
let idx = thread_rng().gen_range(0, num - 1);
let ancestors = vec![(0, 0)].into_iter().collect();
let account = accounts.load_slow(&ancestors, &pubkeys[idx]);
let account1 = Some((
Account::new((idx + 1) as u64, 0, &Account::default().owner),
0,
));
assert_eq!(account, account1);
}
}
#[test]
fn test_accounts_serialize() {
solana_logger::setup();
let (_accounts_dir, paths) = get_temp_accounts_paths(4).unwrap();
let accounts = Accounts::new(paths);
let mut pubkeys: Vec<Pubkey> = vec![];
create_test_accounts(&accounts, &mut pubkeys, 100, 0);
check_accounts(&accounts, &pubkeys, 100);
accounts.add_root(0);
let mut writer = Cursor::new(vec![]);
serialize_into(
&mut writer,
&AccountsDBSerialize::new(
&*accounts.accounts_db,
0,
&accounts.accounts_db.get_snapshot_storages(0),
),
)
.unwrap();
let copied_accounts = TempDir::new().unwrap();
// Simulate obtaining a copy of the AppendVecs from a tarball
copy_append_vecs(&accounts.accounts_db, copied_accounts.path()).unwrap();
let buf = writer.into_inner();
let mut reader = BufReader::new(&buf[..]);
let (_accounts_dir, daccounts_paths) = get_temp_accounts_paths(2).unwrap();
let daccounts = Accounts::from_stream(
&daccounts_paths,
&HashMap::default(),
&[],
&mut reader,
copied_accounts.path(),
)
.unwrap();
check_accounts(&daccounts, &pubkeys, 100);
assert_eq!(accounts.bank_hash_at(0), daccounts.bank_hash_at(0));
}
#[test]
fn test_accounts_locks() {
let keypair0 = Keypair::new();

View File

@ -21,20 +21,13 @@
use crate::{
accounts_index::{AccountsIndex, Ancestors, SlotList, SlotSlice},
append_vec::{AppendVec, StoredAccount, StoredMeta},
bank::deserialize_from_snapshot,
};
use bincode::{deserialize_from, serialize_into};
use byteorder::{ByteOrder, LittleEndian};
use fs_extra::dir::CopyOptions;
use lazy_static::lazy_static;
use log::*;
use rand::{thread_rng, Rng};
use rayon::{prelude::*, ThreadPool};
use serde::{
de::{MapAccess, Visitor},
ser::{SerializeMap, Serializer},
Deserialize, Serialize,
};
use serde::{Deserialize, Serialize};
use solana_measure::measure::Measure;
use solana_rayon_threadlimit::get_thread_count;
use solana_sdk::{
@ -45,8 +38,7 @@ use solana_sdk::{
};
use std::{
collections::{HashMap, HashSet},
fmt,
io::{BufReader, Cursor, Error as IOError, ErrorKind, Read, Result as IOResult},
io::{Error as IOError, Result as IOResult},
ops::RangeBounds,
path::{Path, PathBuf},
sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
@ -100,35 +92,7 @@ pub type SnapshotStorage = Vec<Arc<AccountStorageEntry>>;
pub type SnapshotStorages = Vec<SnapshotStorage>;
// Each slot has a set of storage entries.
type SlotStores = HashMap<usize, Arc<AccountStorageEntry>>;
struct AccountStorageVisitor;
impl<'de> Visitor<'de> for AccountStorageVisitor {
type Value = AccountStorage;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("Expecting AccountStorage")
}
#[allow(clippy::mutex_atomic)]
fn visit_map<M>(self, mut access: M) -> Result<Self::Value, M::Error>
where
M: MapAccess<'de>,
{
let mut map = HashMap::new();
while let Some((slot, storage_entries)) = access.next_entry()? {
let storage_entries: Vec<AccountStorageEntry> = storage_entries;
let storage_slot_map = map.entry(slot).or_insert_with(HashMap::new);
for mut storage in storage_entries {
storage.slot = slot;
storage_slot_map.insert(storage.id, Arc::new(storage));
}
}
Ok(AccountStorage(map))
}
}
pub(crate) type SlotStores = HashMap<usize, Arc<AccountStorageEntry>>;
trait Versioned {
fn version(&self) -> u64;
@ -146,32 +110,6 @@ impl Versioned for (u64, AccountInfo) {
}
}
struct AccountStorageSerialize<'a> {
account_storage_entries: &'a [SnapshotStorage],
}
impl<'a> Serialize for AccountStorageSerialize<'a> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut map = serializer.serialize_map(Some(self.account_storage_entries.len()))?;
let mut count = 0;
let mut serialize_account_storage_timer = Measure::start("serialize_account_storage_ms");
for storage_entries in self.account_storage_entries {
map.serialize_entry(&storage_entries.first().unwrap().slot, storage_entries)?;
count += storage_entries.len();
}
serialize_account_storage_timer.stop();
datapoint_info!(
"serialize_account_storage_ms",
("duration", serialize_account_storage_timer.as_ms(), i64),
("num_entries", count, i64),
);
map.end()
}
}
#[derive(Clone, Default, Debug)]
pub struct AccountStorage(pub HashMap<Slot, SlotStores>);
@ -193,22 +131,19 @@ impl AccountStorage {
}
}
impl<'de> Deserialize<'de> for AccountStorage {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
deserializer.deserialize_map(AccountStorageVisitor)
}
}
#[derive(Debug, PartialEq, Copy, Clone, Deserialize, Serialize)]
#[derive(Debug, Eq, PartialEq, Copy, Clone, Deserialize, Serialize)]
pub enum AccountStorageStatus {
Available = 0,
Full = 1,
Candidate = 2,
}
impl Default for AccountStorageStatus {
fn default() -> Self {
Self::Available
}
}
#[derive(Debug)]
pub enum BankHashVerificationError {
MismatchedAccountHash,
@ -217,15 +152,14 @@ pub enum BankHashVerificationError {
}
/// Persistent storage structure holding the accounts
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug)]
pub struct AccountStorageEntry {
id: AppendVecId,
pub(crate) id: AppendVecId,
#[serde(skip)]
slot: Slot,
pub(crate) slot: Slot,
/// storage holding the accounts
accounts: AppendVec,
pub(crate) accounts: AppendVec,
/// Keeps track of the number of accounts stored in a specific AppendVec.
/// This is periodically checked to reuse the stores that do not have
@ -235,13 +169,24 @@ pub struct AccountStorageEntry {
count_and_status: RwLock<(usize, AccountStorageStatus)>,
}
impl Default for AccountStorageEntry {
fn default() -> Self {
Self {
id: 0,
slot: 0,
accounts: AppendVec::new_empty_map(0),
count_and_status: RwLock::new((0, AccountStorageStatus::Available)),
}
}
}
impl AccountStorageEntry {
pub fn new(path: &Path, slot: Slot, id: usize, file_size: u64) -> Self {
let tail = AppendVec::new_relative_path(slot, id);
let path = Path::new(path).join(&tail);
let accounts = AppendVec::new(&path, true, file_size as usize);
AccountStorageEntry {
Self {
id,
slot,
accounts,
@ -249,6 +194,15 @@ impl AccountStorageEntry {
}
}
pub(crate) fn new_empty_map(id: AppendVecId, accounts_current_len: usize) -> Self {
Self {
id,
slot: 0,
accounts: AppendVec::new_empty_map(accounts_current_len),
count_and_status: RwLock::new((0, AccountStorageStatus::Available)),
}
}
pub fn set_status(&self, mut status: AccountStorageStatus) {
let mut count_and_status = self.count_and_status.write().unwrap();
@ -365,55 +319,6 @@ pub fn get_temp_accounts_paths(count: u32) -> IOResult<(Vec<TempDir>, Vec<PathBu
Ok((temp_dirs, paths))
}
pub struct AccountsDBSerialize<'a, 'b> {
accounts_db: &'a AccountsDB,
slot: Slot,
account_storage_entries: &'b [SnapshotStorage],
}
impl<'a, 'b> AccountsDBSerialize<'a, 'b> {
pub fn new(
accounts_db: &'a AccountsDB,
slot: Slot,
account_storage_entries: &'b [SnapshotStorage],
) -> Self {
Self {
accounts_db,
slot,
account_storage_entries,
}
}
}
impl<'a, 'b> Serialize for AccountsDBSerialize<'a, 'b> {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::ser::Serializer,
{
use serde::ser::Error;
let mut wr = Cursor::new(vec![]);
let version = self.accounts_db.write_version.load(Ordering::Relaxed);
let account_storage_serialize = AccountStorageSerialize {
account_storage_entries: self.account_storage_entries,
};
serialize_into(&mut wr, &account_storage_serialize).map_err(Error::custom)?;
serialize_into(&mut wr, &version).map_err(Error::custom)?;
let bank_hashes = self.accounts_db.bank_hashes.read().unwrap();
serialize_into(
&mut wr,
&(
self.slot,
&*bank_hashes
.get(&self.slot)
.unwrap_or_else(|| panic!("No bank_hashes entry for slot {}", self.slot)),
),
)
.map_err(Error::custom)?;
let len = wr.position() as usize;
serializer.serialize_bytes(&wr.into_inner()[..len])
}
}
#[derive(Clone, Default, Debug, Serialize, Deserialize, PartialEq)]
pub struct BankHashStats {
pub num_updated_accounts: u64,
@ -473,10 +378,10 @@ pub struct AccountsDB {
pub next_id: AtomicUsize,
pub shrink_candidate_slots: Mutex<Vec<Slot>>,
write_version: AtomicU64,
pub(crate) write_version: AtomicU64,
/// Set of storage paths to pick from
paths: Vec<PathBuf>,
pub(crate) paths: Vec<PathBuf>,
/// Directory of paths this accounts_db needs to hold/remove
temp_paths: Option<Vec<TempDir>>,
@ -592,102 +497,6 @@ impl AccountsDB {
}
}
pub fn accounts_from_stream<R: Read, P: AsRef<Path>>(
&self,
mut stream: &mut BufReader<R>,
stream_append_vecs_path: P,
) -> Result<(), IOError> {
let _len: usize =
deserialize_from(&mut stream).map_err(|e| AccountsDB::get_io_error(&e.to_string()))?;
let storage: AccountStorage = deserialize_from_snapshot(&mut stream)
.map_err(|e| AccountsDB::get_io_error(&e.to_string()))?;
// Remap the deserialized AppendVec paths to point to correct local paths
let new_storage_map: Result<HashMap<Slot, SlotStores>, IOError> = storage
.0
.into_iter()
.map(|(slot, mut slot_storage)| {
let mut new_slot_storage = HashMap::new();
for (id, storage_entry) in slot_storage.drain() {
let path_index = thread_rng().gen_range(0, self.paths.len());
let local_dir = &self.paths[path_index];
std::fs::create_dir_all(local_dir).expect("Create directory failed");
// Move the corresponding AppendVec from the snapshot into the directory pointed
// at by `local_dir`
let append_vec_relative_path =
AppendVec::new_relative_path(slot, storage_entry.id);
let append_vec_abs_path = stream_append_vecs_path
.as_ref()
.join(&append_vec_relative_path);
let target = local_dir.join(append_vec_abs_path.file_name().unwrap());
if std::fs::rename(append_vec_abs_path.clone(), target).is_err() {
let mut copy_options = CopyOptions::new();
copy_options.overwrite = true;
let e = fs_extra::move_items(
&vec![&append_vec_abs_path],
&local_dir,
&copy_options,
)
.map_err(|e| {
AccountsDB::get_io_error(&format!(
"Unable to move {:?} to {:?}: {}",
append_vec_abs_path, local_dir, e
))
});
if e.is_err() {
info!("{:?}", e);
continue;
}
};
// Notify the AppendVec of the new file location
let local_path = local_dir.join(append_vec_relative_path);
let mut u_storage_entry = Arc::try_unwrap(storage_entry).unwrap();
u_storage_entry
.set_file(local_path)
.map_err(|e| AccountsDB::get_io_error(&e.to_string()))?;
new_slot_storage.insert(id, Arc::new(u_storage_entry));
}
Ok((slot, new_slot_storage))
})
.collect();
let new_storage_map = new_storage_map?;
let mut storage = AccountStorage(new_storage_map);
// discard any slots with no storage entries
// this can happen if a non-root slot was serialized
// but non-root stores should not be included in the snapshot
storage.0.retain(|_slot, stores| !stores.is_empty());
let version: u64 = deserialize_from(&mut stream)
.map_err(|_| AccountsDB::get_io_error("write version deserialize error"))?;
let (slot, bank_hash): (Slot, BankHashInfo) = deserialize_from(&mut stream)
.map_err(|_| AccountsDB::get_io_error("bank hashes deserialize error"))?;
self.bank_hashes.write().unwrap().insert(slot, bank_hash);
// Process deserialized data, set necessary fields in self
let max_id: usize = *storage
.0
.values()
.flat_map(HashMap::keys)
.max()
.expect("At least one storage entry must exist from deserializing stream");
{
let mut stores = self.storage.write().unwrap();
stores.0.extend(storage.0);
}
self.next_id.store(max_id + 1, Ordering::Relaxed);
self.write_version.fetch_add(version, Ordering::Relaxed);
self.generate_index();
Ok(())
}
fn new_storage_entry(&self, slot: Slot, path: &Path, size: u64) -> AccountStorageEntry {
AccountStorageEntry::new(
path,
@ -1635,7 +1444,7 @@ impl AccountsDB {
let accounts_index = self.accounts_index.read().unwrap();
let storage = self.storage.read().unwrap();
let keys: Vec<_> = accounts_index.account_maps.keys().collect();
let mismatch_found = AtomicBool::new(false);
let mismatch_found = AtomicU64::new(0);
let hashes: Vec<_> = keys
.par_iter()
.filter_map(|pubkey| {
@ -1652,9 +1461,7 @@ impl AccountsDB {
if check_hash {
let hash = Self::hash_stored_account(*slot, &account);
if hash != *account.hash {
mismatch_found.store(true, Ordering::Relaxed);
}
if mismatch_found.load(Ordering::Relaxed) {
mismatch_found.fetch_add(1, Ordering::Relaxed);
return None;
}
}
@ -1669,7 +1476,11 @@ impl AccountsDB {
}
})
.collect();
if mismatch_found.load(Ordering::Relaxed) {
if mismatch_found.load(Ordering::Relaxed) > 0 {
warn!(
"{} mismatched account hash(es) found",
mismatch_found.load(Ordering::Relaxed)
);
return Err(MismatchedAccountHash);
}
@ -2000,12 +1811,7 @@ impl AccountsDB {
}
}
fn get_io_error(error: &str) -> IOError {
warn!("AccountsDB error: {:?}", error);
IOError::new(ErrorKind::Other, error)
}
fn generate_index(&self) {
pub fn generate_index(&self) {
let storage = self.storage.read().unwrap();
let mut slots: Vec<Slot> = storage.0.keys().cloned().collect();
slots.sort();
@ -2085,14 +1891,11 @@ impl AccountsDB {
pub mod tests {
// TODO: all the bank tests are bank specific, issue: 2194
use super::*;
use crate::accounts_index::RefCount;
use crate::append_vec::AccountMeta;
use crate::{accounts_index::RefCount, append_vec::AccountMeta};
use assert_matches::assert_matches;
use bincode::serialize_into;
use rand::{thread_rng, Rng};
use solana_sdk::{account::Account, hash::HASH_BYTES};
use std::{fs, str::FromStr};
use tempfile::TempDir;
fn linear_ancestors(end_slot: u64) -> Ancestors {
let mut ancestors: Ancestors = vec![(0, 0)].into_iter().collect();
@ -2954,26 +2757,9 @@ pub mod tests {
}
fn reconstruct_accounts_db_via_serialization(accounts: &AccountsDB, slot: Slot) -> AccountsDB {
let mut writer = Cursor::new(vec![]);
let snapshot_storages = accounts.get_snapshot_storages(slot);
serialize_into(
&mut writer,
&AccountsDBSerialize::new(&accounts, slot, &snapshot_storages),
)
.unwrap();
let buf = writer.into_inner();
let mut reader = BufReader::new(&buf[..]);
let daccounts = AccountsDB::new(Vec::new());
let copied_accounts = TempDir::new().unwrap();
// Simulate obtaining a copy of the AppendVecs from a tarball
copy_append_vecs(&accounts, copied_accounts.path()).unwrap();
daccounts
.accounts_from_stream(&mut reader, copied_accounts.path())
.unwrap();
let daccounts =
crate::serde_snapshot::reconstruct_accounts_db_via_serialization(accounts, slot);
print_count_and_status("daccounts", &daccounts);
daccounts
}

View File

@ -1,4 +1,3 @@
use bincode::{deserialize_from, serialize_into};
use memmap::MmapMut;
use serde::{Deserialize, Serialize};
use solana_sdk::{
@ -8,10 +7,9 @@ use solana_sdk::{
pubkey::Pubkey,
};
use std::{
fmt,
fs::{remove_file, OpenOptions},
io,
io::{Cursor, Seek, SeekFrom, Write},
io::{Seek, SeekFrom, Write},
mem,
path::{Path, PathBuf},
sync::atomic::{AtomicUsize, Ordering},
@ -175,7 +173,7 @@ impl AppendVec {
}
#[allow(clippy::mutex_atomic)]
fn new_empty_map(current_len: usize) -> Self {
pub(crate) fn new_empty_map(current_len: usize) -> Self {
let map = MmapMut::map_anon(1).expect("failed to map the data file");
AppendVec {
@ -480,54 +478,6 @@ pub mod test_utils {
}
}
#[allow(clippy::mutex_atomic)]
impl Serialize for AppendVec {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::ser::Serializer,
{
use serde::ser::Error;
let len = std::mem::size_of::<usize>();
let mut buf = vec![0u8; len];
let mut wr = Cursor::new(&mut buf[..]);
serialize_into(&mut wr, &(self.current_len.load(Ordering::Relaxed) as u64))
.map_err(Error::custom)?;
let len = wr.position() as usize;
serializer.serialize_bytes(&wr.into_inner()[..len])
}
}
struct AppendVecVisitor;
impl<'a> serde::de::Visitor<'a> for AppendVecVisitor {
type Value = AppendVec;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("Expecting AppendVec")
}
fn visit_bytes<E>(self, data: &[u8]) -> std::result::Result<Self::Value, E>
where
E: serde::de::Error,
{
use serde::de::Error;
let mut rd = Cursor::new(&data[..]);
let current_len: usize = deserialize_from(&mut rd).map_err(Error::custom)?;
// Note this does not initialize a valid Mmap in the AppendVec, needs to be done
// externally
Ok(AppendVec::new_empty_map(current_len))
}
}
impl<'de> Deserialize<'de> for AppendVec {
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: ::serde::Deserializer<'de>,
{
deserializer.deserialize_bytes(AppendVecVisitor)
}
}
#[cfg(test)]
pub mod tests {
use super::test_utils::*;

View File

@ -7,7 +7,7 @@ use crate::{
AccountAddressFilter, Accounts, TransactionAccounts, TransactionLoadResult,
TransactionLoaders,
},
accounts_db::{AccountsDBSerialize, ErrorCounters, SnapshotStorage, SnapshotStorages},
accounts_db::{ErrorCounters, SnapshotStorages},
accounts_index::Ancestors,
blockhash_queue::BlockhashQueue,
builtin_programs::get_builtin_programs,
@ -21,7 +21,6 @@ use crate::{
transaction_batch::TransactionBatch,
transaction_utils::OrderedIterator,
};
use bincode::{deserialize_from, serialize_into};
use byteorder::{ByteOrder, LittleEndian};
use itertools::Itertools;
use log::*;
@ -59,17 +58,15 @@ use solana_vote_program::vote_state::VoteState;
use std::{
cell::RefCell,
collections::{HashMap, HashSet},
io::{BufReader, Cursor, Error as IOError, Read},
mem,
ops::RangeInclusive,
path::{Path, PathBuf},
path::PathBuf,
rc::Rc,
sync::atomic::{AtomicBool, AtomicU64, Ordering},
sync::{Arc, RwLock, RwLockReadGuard},
};
pub const SECONDS_PER_YEAR: f64 = 365.25 * 24.0 * 60.0 * 60.0;
pub const MAX_SNAPSHOT_DATA_FILE_SIZE: u64 = 32 * 1024 * 1024 * 1024; // 32 GiB
pub const MAX_LEADER_SCHEDULE_STAKES: Epoch = 5;
@ -98,73 +95,27 @@ type EpochCount = u64;
#[derive(Default)]
pub struct BankRc {
/// where all the Accounts are stored
accounts: Arc<Accounts>,
pub accounts: Arc<Accounts>,
/// Previous checkpoint of this bank
parent: RwLock<Option<Arc<Bank>>>,
pub(crate) parent: RwLock<Option<Arc<Bank>>>,
/// Current slot
slot: Slot,
pub(crate) slot: Slot,
}
impl BankRc {
pub fn from_stream<R: Read, P: AsRef<Path>>(
account_paths: &[PathBuf],
slot: Slot,
ancestors: &Ancestors,
frozen_account_pubkeys: &[Pubkey],
mut stream: &mut BufReader<R>,
stream_append_vecs_path: P,
) -> std::result::Result<Self, IOError> {
let _len: usize =
deserialize_from(&mut stream).map_err(|e| BankRc::get_io_error(&e.to_string()))?;
let accounts = Accounts::from_stream(
account_paths,
ancestors,
frozen_account_pubkeys,
stream,
stream_append_vecs_path,
)?;
Ok(BankRc {
pub(crate) fn new(accounts: Accounts, slot: Slot) -> Self {
Self {
accounts: Arc::new(accounts),
parent: RwLock::new(None),
slot,
})
}
}
pub fn get_snapshot_storages(&self, slot: Slot) -> SnapshotStorages {
self.accounts.accounts_db.get_snapshot_storages(slot)
}
fn get_io_error(error: &str) -> IOError {
warn!("BankRc error: {:?}", error);
std::io::Error::new(std::io::ErrorKind::Other, error)
}
}
pub struct BankRcSerialize<'a, 'b> {
pub bank_rc: &'a BankRc,
pub snapshot_storages: &'b [SnapshotStorage],
}
impl<'a, 'b> Serialize for BankRcSerialize<'a, 'b> {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::ser::Serializer,
{
use serde::ser::Error;
let mut wr = Cursor::new(Vec::new());
let accounts_db_serialize = AccountsDBSerialize::new(
&*self.bank_rc.accounts.accounts_db,
self.bank_rc.slot,
self.snapshot_storages,
);
serialize_into(&mut wr, &accounts_db_serialize).map_err(Error::custom)?;
let len = wr.position() as usize;
serializer.serialize_bytes(&wr.into_inner()[..len])
}
}
#[derive(Default)]
@ -2586,31 +2537,16 @@ pub fn goto_end_of_slot(bank: &mut Bank) {
}
}
// This guards against possible memory exhaustions in bincode when restoring
// the full state from snapshot data files by imposing a fixed hard limit with
// ample of headrooms for such a usecase.
pub fn deserialize_from_snapshot<R, T>(reader: R) -> bincode::Result<T>
where
R: Read,
T: serde::de::DeserializeOwned,
{
bincode::config()
.limit(MAX_SNAPSHOT_DATA_FILE_SIZE)
.deserialize_from(reader)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
accounts_db::{get_temp_accounts_paths, tests::copy_append_vecs},
accounts_index::Ancestors,
genesis_utils::{
create_genesis_config_with_leader, GenesisConfigInfo, BOOTSTRAP_VALIDATOR_LAMPORTS,
},
status_cache::MAX_CACHE_ENTRIES,
};
use bincode::{serialize_into, serialized_size};
use solana_sdk::{
account::KeyedAccount,
account_utils::StateMut,
@ -2636,8 +2572,7 @@ mod tests {
vote_instruction,
vote_state::{self, Vote, VoteInit, VoteState, MAX_LOCKOUT_HISTORY},
};
use std::{io::Cursor, result, time::Duration};
use tempfile::TempDir;
use std::{result, time::Duration};
#[test]
fn test_hash_age_kind_is_durable_nonce() {
@ -5742,70 +5677,6 @@ mod tests {
assert!(bank.is_delta.load(Ordering::Relaxed));
}
#[test]
fn test_bank_serialize() {
solana_logger::setup();
let (genesis_config, _) = create_genesis_config(500);
let bank0 = Arc::new(Bank::new(&genesis_config));
let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1);
bank0.squash();
// Create an account on a non-root fork
let key1 = Keypair::new();
bank1.deposit(&key1.pubkey(), 5);
let bank2 = Bank::new_from_parent(&bank0, &Pubkey::default(), 2);
// Test new account
let key2 = Keypair::new();
bank2.deposit(&key2.pubkey(), 10);
assert_eq!(bank2.get_balance(&key2.pubkey()), 10);
let key3 = Keypair::new();
bank2.deposit(&key3.pubkey(), 0);
bank2.squash();
let snapshot_storages = bank2.get_snapshot_storages();
let rc_serialize = BankRcSerialize {
bank_rc: &bank2.rc,
snapshot_storages: &snapshot_storages,
};
let len = serialized_size(&bank2).unwrap() + serialized_size(&rc_serialize).unwrap();
let mut buf = vec![0u8; len as usize];
let mut writer = Cursor::new(&mut buf[..]);
serialize_into(&mut writer, &bank2).unwrap();
serialize_into(&mut writer, &rc_serialize).unwrap();
let mut rdr = Cursor::new(&buf[..]);
let mut dbank: Bank = deserialize_from_snapshot(&mut rdr).unwrap();
let mut reader = BufReader::new(&buf[rdr.position() as usize..]);
// Create a new set of directories for this bank's accounts
let (_accounts_dir, dbank_paths) = get_temp_accounts_paths(4).unwrap();
let ref_sc = StatusCacheRc::default();
ref_sc.status_cache.write().unwrap().add_root(2);
// Create a directory to simulate AppendVecs unpackaged from a snapshot tar
let copied_accounts = TempDir::new().unwrap();
copy_append_vecs(&bank2.rc.accounts.accounts_db, copied_accounts.path()).unwrap();
dbank.set_bank_rc(
BankRc::from_stream(
&dbank_paths,
dbank.slot(),
&dbank.ancestors,
&[],
&mut reader,
copied_accounts.path(),
)
.unwrap(),
ref_sc,
);
assert_eq!(dbank.get_balance(&key1.pubkey()), 0);
assert_eq!(dbank.get_balance(&key2.pubkey()), 10);
assert_eq!(dbank.get_balance(&key3.pubkey()), 0);
bank2.compare_bank(&dbank);
}
#[test]
#[allow(clippy::float_cmp)]
fn test_check_point_value() {

View File

@ -14,6 +14,7 @@ pub mod message_processor;
mod native_loader;
pub mod nonce_utils;
pub mod rent_collector;
pub mod serde_snapshot;
pub mod stakes;
pub mod status_cache;
mod system_instruction_processor;

View File

@ -0,0 +1,295 @@
use {
crate::{
accounts::Accounts,
accounts_db::{
AccountStorageEntry, AccountStorageStatus, AccountsDB, AppendVecId, BankHashInfo,
},
append_vec::AppendVec,
bank::BankRc,
},
bincode::{deserialize_from, serialize_into},
fs_extra::dir::CopyOptions,
log::{info, warn},
rand::{thread_rng, Rng},
serde::{
de::{DeserializeOwned, Visitor},
Deserialize, Deserializer, Serialize, Serializer,
},
solana_sdk::clock::Slot,
std::{
cmp::min,
collections::HashMap,
fmt::{Formatter, Result as FormatResult},
io::{
BufReader, BufWriter, Cursor, Error as IoError, ErrorKind as IoErrorKind, Read, Write,
},
path::{Path, PathBuf},
result::Result,
sync::{atomic::Ordering, Arc},
},
};
mod future;
mod legacy;
mod tests;
mod utils;
use future::Context as TypeContextFuture;
use legacy::Context as TypeContextLegacy;
#[allow(unused_imports)]
use utils::{serialize_iter_as_map, serialize_iter_as_seq, serialize_iter_as_tuple};
// a number of test cases in accounts_db use this
#[cfg(test)]
pub(crate) use self::tests::reconstruct_accounts_db_via_serialization;
pub use crate::accounts_db::{SnapshotStorage, SnapshotStorages};
#[derive(Copy, Clone, Eq, PartialEq)]
pub enum SerdeStyle {
NEWER,
OLDER,
}
const MAX_ACCOUNTS_DB_STREAM_SIZE: u64 = 32 * 1024 * 1024 * 1024;
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
pub struct AccountDBFields<T>(HashMap<Slot, Vec<T>>, u64, Slot, BankHashInfo);
pub trait TypeContext<'a> {
type SerializableAccountStorageEntry: Serialize
+ DeserializeOwned
+ From<&'a AccountStorageEntry>
+ Into<AccountStorageEntry>;
fn serialize_bank_rc_fields<S: serde::ser::Serializer>(
serializer: S,
serializable_bank: &SerializableBankRc<'a, Self>,
) -> std::result::Result<S::Ok, S::Error>
where
Self: std::marker::Sized;
fn serialize_accounts_db_fields<S: serde::ser::Serializer>(
serializer: S,
serializable_db: &SerializableAccountsDB<'a, Self>,
) -> std::result::Result<S::Ok, S::Error>
where
Self: std::marker::Sized;
fn deserialize_accounts_db_fields<R>(
stream: &mut BufReader<R>,
) -> Result<AccountDBFields<Self::SerializableAccountStorageEntry>, IoError>
where
R: Read;
// we might define fn (de)serialize_bank(...) -> Result<Bank,...> for versionized bank serialization in the future
}
fn bankrc_to_io_error<T: ToString>(error: T) -> IoError {
let msg = error.to_string();
warn!("BankRc error: {:?}", msg);
IoError::new(IoErrorKind::Other, msg)
}
fn accountsdb_to_io_error<T: ToString>(error: T) -> IoError {
let msg = error.to_string();
warn!("AccountsDB error: {:?}", msg);
IoError::new(IoErrorKind::Other, msg)
}
pub fn bankrc_from_stream<R, P>(
serde_style: SerdeStyle,
account_paths: &[PathBuf],
slot: Slot,
stream: &mut BufReader<R>,
stream_append_vecs_path: P,
) -> std::result::Result<BankRc, IoError>
where
R: Read,
P: AsRef<Path>,
{
macro_rules! INTO {
($x:ident) => {
Ok(BankRc::new(
Accounts::new_empty(context_accountsdb_from_fields::<$x, P>(
$x::deserialize_accounts_db_fields(stream)?,
account_paths,
stream_append_vecs_path,
)?),
slot,
))
};
}
match serde_style {
SerdeStyle::NEWER => INTO!(TypeContextFuture),
SerdeStyle::OLDER => INTO!(TypeContextLegacy),
}
}
pub fn bankrc_to_stream<W>(
serde_style: SerdeStyle,
stream: &mut BufWriter<W>,
bank_rc: &BankRc,
snapshot_storages: &[SnapshotStorage],
) -> Result<(), IoError>
where
W: Write,
{
macro_rules! INTO {
($x:ident) => {
serialize_into(
stream,
&SerializableBankRc::<$x> {
bank_rc,
snapshot_storages,
phantom: std::marker::PhantomData::default(),
},
)
.map_err(bankrc_to_io_error)
};
}
match serde_style {
SerdeStyle::NEWER => INTO!(TypeContextFuture),
SerdeStyle::OLDER => INTO!(TypeContextLegacy),
}
}
pub struct SerializableBankRc<'a, C> {
bank_rc: &'a BankRc,
snapshot_storages: &'a [SnapshotStorage],
phantom: std::marker::PhantomData<C>,
}
impl<'a, C: TypeContext<'a>> Serialize for SerializableBankRc<'a, C> {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::ser::Serializer,
{
C::serialize_bank_rc_fields(serializer, self)
}
}
pub struct SerializableAccountsDB<'a, C> {
accounts_db: &'a AccountsDB,
slot: Slot,
account_storage_entries: &'a [SnapshotStorage],
phantom: std::marker::PhantomData<C>,
}
impl<'a, C: TypeContext<'a>> Serialize for SerializableAccountsDB<'a, C> {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::ser::Serializer,
{
C::serialize_accounts_db_fields(serializer, self)
}
}
fn context_accountsdb_from_fields<'a, C, P>(
account_db_fields: AccountDBFields<C::SerializableAccountStorageEntry>,
account_paths: &[PathBuf],
stream_append_vecs_path: P,
) -> Result<AccountsDB, IoError>
where
C: TypeContext<'a>,
P: AsRef<Path>,
{
let accounts_db = AccountsDB::new(account_paths.to_vec());
let AccountDBFields(storage, version, slot, bank_hash_info) = account_db_fields;
// convert to two level map of slot -> id -> account storage entry
let storage = {
let mut map = HashMap::new();
for (slot, entries) in storage.into_iter() {
let sub_map = map.entry(slot).or_insert_with(HashMap::new);
for entry in entries.into_iter() {
let mut entry: AccountStorageEntry = entry.into();
entry.slot = slot;
sub_map.insert(entry.id, Arc::new(entry));
}
}
map
};
// Remap the deserialized AppendVec paths to point to correct local paths
let mut storage = storage
.into_iter()
.map(|(slot, mut slot_storage)| {
let mut new_slot_storage = HashMap::new();
for (id, storage_entry) in slot_storage.drain() {
let path_index = thread_rng().gen_range(0, accounts_db.paths.len());
let local_dir = &accounts_db.paths[path_index];
std::fs::create_dir_all(local_dir).expect("Create directory failed");
// Move the corresponding AppendVec from the snapshot into the directory pointed
// at by `local_dir`
let append_vec_relative_path = AppendVec::new_relative_path(slot, storage_entry.id);
let append_vec_abs_path = stream_append_vecs_path
.as_ref()
.join(&append_vec_relative_path);
let target = local_dir.join(append_vec_abs_path.file_name().unwrap());
if std::fs::rename(append_vec_abs_path.clone(), target).is_err() {
let mut copy_options = CopyOptions::new();
copy_options.overwrite = true;
let e = fs_extra::move_items(
&vec![&append_vec_abs_path],
&local_dir,
&copy_options,
)
.map_err(|e| {
format!(
"unable to move {:?} to {:?}: {}",
append_vec_abs_path, local_dir, e
)
})
.map_err(accountsdb_to_io_error);
if e.is_err() {
info!("{:?}", e);
continue;
}
};
// Notify the AppendVec of the new file location
let local_path = local_dir.join(append_vec_relative_path);
let mut u_storage_entry = Arc::try_unwrap(storage_entry).unwrap();
u_storage_entry
.set_file(local_path)
.map_err(accountsdb_to_io_error)?;
new_slot_storage.insert(id, Arc::new(u_storage_entry));
}
Ok((slot, new_slot_storage))
})
.collect::<Result<HashMap<Slot, _>, IoError>>()?;
// discard any slots with no storage entries
// this can happen if a non-root slot was serialized
// but non-root stores should not be included in the snapshot
storage.retain(|_slot, stores| !stores.is_empty());
accounts_db
.bank_hashes
.write()
.unwrap()
.insert(slot, bank_hash_info);
// Process deserialized data, set necessary fields in self
let max_id: usize = *storage
.values()
.flat_map(HashMap::keys)
.max()
.expect("At least one storage entry must exist from deserializing stream");
{
let mut stores = accounts_db.storage.write().unwrap();
stores.0.extend(storage);
}
accounts_db.next_id.store(max_id + 1, Ordering::Relaxed);
accounts_db
.write_version
.fetch_add(version, Ordering::Relaxed);
accounts_db.generate_index();
Ok(accounts_db)
}

View File

@ -0,0 +1,101 @@
use {super::*, solana_measure::measure::Measure, std::cell::RefCell};
// Serializable version of AccountStorageEntry for snapshot format
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
pub(super) struct SerializableAccountStorageEntry {
id: AppendVecId,
accounts_current_len: usize,
}
impl From<&AccountStorageEntry> for SerializableAccountStorageEntry {
fn from(rhs: &AccountStorageEntry) -> Self {
Self {
id: rhs.id,
accounts_current_len: rhs.accounts.len(),
}
}
}
impl Into<AccountStorageEntry> for SerializableAccountStorageEntry {
fn into(self) -> AccountStorageEntry {
AccountStorageEntry::new_empty_map(self.id, self.accounts_current_len)
}
}
pub(super) struct Context {}
impl<'a> TypeContext<'a> for Context {
type SerializableAccountStorageEntry = SerializableAccountStorageEntry;
fn serialize_bank_rc_fields<S: serde::ser::Serializer>(
serializer: S,
serializable_bank: &SerializableBankRc<'a, Self>,
) -> std::result::Result<S::Ok, S::Error>
where
Self: std::marker::Sized,
{
let accounts_db_serialize = SerializableAccountsDB::<'a, Self> {
accounts_db: &*serializable_bank.bank_rc.accounts.accounts_db,
slot: serializable_bank.bank_rc.slot,
account_storage_entries: serializable_bank.snapshot_storages,
phantom: std::marker::PhantomData::default(),
};
accounts_db_serialize.serialize(serializer)
}
fn serialize_accounts_db_fields<S: serde::ser::Serializer>(
serializer: S,
serializable_db: &SerializableAccountsDB<'a, Self>,
) -> std::result::Result<S::Ok, S::Error>
where
Self: std::marker::Sized,
{
// sample write version before serializing storage entries
let version = serializable_db
.accounts_db
.write_version
.load(Ordering::Relaxed);
// (1st of 3 elements) write the list of account storage entry lists out as a map
let entry_count = RefCell::<usize>::new(0);
let entries =
serialize_iter_as_map(serializable_db.account_storage_entries.iter().map(|x| {
*entry_count.borrow_mut() += x.len();
(
x.first().unwrap().slot,
serialize_iter_as_seq(
x.iter()
.map(|x| Self::SerializableAccountStorageEntry::from(x.as_ref())),
),
)
}));
let slot = serializable_db.slot;
let hash = serializable_db
.accounts_db
.bank_hashes
.read()
.unwrap()
.get(&serializable_db.slot)
.unwrap_or_else(|| panic!("No bank_hashes entry for slot {}", serializable_db.slot))
.clone();
let mut serialize_account_storage_timer = Measure::start("serialize_account_storage_ms");
let result = (entries, version, slot, hash).serialize(serializer);
serialize_account_storage_timer.stop();
datapoint_info!(
"serialize_account_storage_ms",
("duration", serialize_account_storage_timer.as_ms(), i64),
("num_entries", *entry_count.borrow(), i64),
);
result
}
fn deserialize_accounts_db_fields<R>(
mut stream: &mut BufReader<R>,
) -> Result<AccountDBFields<Self::SerializableAccountStorageEntry>, IoError>
where
R: Read,
{
deserialize_from(&mut stream).map_err(accountsdb_to_io_error)
}
}

View File

@ -0,0 +1,209 @@
use {super::*, solana_measure::measure::Measure, std::cell::RefCell};
// Serializable version of AccountStorageEntry for snapshot format
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
pub(super) struct SerializableAccountStorageEntry {
id: AppendVecId,
accounts: SerializableAppendVec,
count_and_status: (usize, AccountStorageStatus),
}
impl From<&AccountStorageEntry> for SerializableAccountStorageEntry {
fn from(rhs: &AccountStorageEntry) -> Self {
Self {
id: rhs.id,
accounts: SerializableAppendVec::from(&rhs.accounts),
..Self::default()
}
}
}
impl Into<AccountStorageEntry> for SerializableAccountStorageEntry {
fn into(self) -> AccountStorageEntry {
AccountStorageEntry::new_empty_map(self.id, self.accounts.current_len)
}
}
// Serializable version of AppendVec for snapshot format
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
struct SerializableAppendVec {
current_len: usize,
}
impl From<&AppendVec> for SerializableAppendVec {
fn from(rhs: &AppendVec) -> SerializableAppendVec {
SerializableAppendVec {
current_len: rhs.len(),
}
}
}
impl Into<AppendVec> for SerializableAppendVec {
fn into(self) -> AppendVec {
AppendVec::new_empty_map(self.current_len)
}
}
// Serialization of AppendVec requires serialization of u64 to
// eight byte vector which is then itself serialized to the stream
impl Serialize for SerializableAppendVec {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
const LEN: usize = std::mem::size_of::<usize>();
let mut buf = [0u8; LEN];
serialize_into(Cursor::new(&mut buf[..]), &(self.current_len as u64))
.map_err(serde::ser::Error::custom)?;
serializer.serialize_bytes(&buf)
}
}
// Deserialization of AppendVec requires deserialization
// of eight byte vector from which u64 is then deserialized
impl<'de> Deserialize<'de> for SerializableAppendVec {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
use serde::de::Error;
struct SerializableAppendVecVisitor;
impl<'a> Visitor<'a> for SerializableAppendVecVisitor {
type Value = SerializableAppendVec;
fn expecting(&self, formatter: &mut Formatter) -> FormatResult {
formatter.write_str("Expecting SerializableAppendVec")
}
fn visit_bytes<E>(self, data: &[u8]) -> std::result::Result<Self::Value, E>
where
E: Error,
{
const LEN: u64 = std::mem::size_of::<usize>() as u64;
let mut rd = Cursor::new(&data[..]);
let current_len: usize = deserialize_from(&mut rd).map_err(Error::custom)?;
if rd.position() != LEN {
Err(Error::custom("SerializableAppendVec: unexpected length"))
} else {
Ok(SerializableAppendVec { current_len })
}
}
}
deserializer.deserialize_bytes(SerializableAppendVecVisitor)
}
}
pub(super) struct Context {}
impl<'a> TypeContext<'a> for Context {
type SerializableAccountStorageEntry = SerializableAccountStorageEntry;
fn serialize_bank_rc_fields<S: serde::ser::Serializer>(
serializer: S,
serializable_bank: &SerializableBankRc<'a, Self>,
) -> std::result::Result<S::Ok, S::Error>
where
Self: std::marker::Sized,
{
// as there is no deserialize_bank_rc_fields(), do not emit the u64
// size field here and have serialize_accounts_db_fields() emit two
// u64 size fields instead
SerializableAccountsDB::<'a, Self> {
accounts_db: &*serializable_bank.bank_rc.accounts.accounts_db,
slot: serializable_bank.bank_rc.slot,
account_storage_entries: serializable_bank.snapshot_storages,
phantom: std::marker::PhantomData::default(),
}
.serialize(serializer)
}
fn serialize_accounts_db_fields<S: serde::ser::Serializer>(
serializer: S,
serializable_db: &SerializableAccountsDB<'a, Self>,
) -> std::result::Result<S::Ok, S::Error>
where
Self: std::marker::Sized,
{
// sample write version before serializing storage entries
let version = serializable_db
.accounts_db
.write_version
.load(Ordering::Relaxed);
// (1st of 3 elements) write the list of account storage entry lists out as a map
let entry_count = RefCell::<usize>::new(0);
let entries =
serialize_iter_as_map(serializable_db.account_storage_entries.iter().map(|x| {
*entry_count.borrow_mut() += x.len();
(
x.first().unwrap().slot,
serialize_iter_as_seq(
x.iter()
.map(|x| Self::SerializableAccountStorageEntry::from(x.as_ref())),
),
)
}));
let slot_hash = (
serializable_db.slot,
serializable_db
.accounts_db
.bank_hashes
.read()
.unwrap()
.get(&serializable_db.slot)
.unwrap_or_else(|| panic!("No bank_hashes entry for slot {}", serializable_db.slot))
.clone(),
);
let mut serialize_account_storage_timer = Measure::start("serialize_account_storage_ms");
let result = (
&MAX_ACCOUNTS_DB_STREAM_SIZE,
&MAX_ACCOUNTS_DB_STREAM_SIZE,
&entries,
&version,
&slot_hash,
)
.serialize(serializer);
serialize_account_storage_timer.stop();
datapoint_info!(
"serialize_account_storage_ms",
("duration", serialize_account_storage_timer.as_ms(), i64),
("num_entries", *entry_count.borrow(), i64),
);
result
}
fn deserialize_accounts_db_fields<R>(
mut stream: &mut BufReader<R>,
) -> Result<AccountDBFields<Self::SerializableAccountStorageEntry>, IoError>
where
R: Read,
{
// read and discard two u64 byte vector lengths
let serialized_len = MAX_ACCOUNTS_DB_STREAM_SIZE;
let serialized_len = min(
serialized_len,
deserialize_from(&mut stream).map_err(accountsdb_to_io_error)?,
);
let serialized_len = min(
serialized_len,
deserialize_from(&mut stream).map_err(accountsdb_to_io_error)?,
);
// (1st of 3 elements) read in map of slots to account storage entries
let storage: HashMap<Slot, Vec<Self::SerializableAccountStorageEntry>> = bincode::config()
.limit(serialized_len)
.deserialize_from(&mut stream)
.map_err(accountsdb_to_io_error)?;
// (2nd of 3 elements) read in write version
let version: u64 = deserialize_from(&mut stream)
.map_err(|e| format!("write version deserialize error: {}", e.to_string()))
.map_err(accountsdb_to_io_error)?;
// (3rd of 3 elements) read in (slot, bank hashes) pair
let (slot, bank_hash_info): (Slot, BankHashInfo) = deserialize_from(&mut stream)
.map_err(|e| format!("bank hashes deserialize error: {}", e.to_string()))
.map_err(accountsdb_to_io_error)?;
Ok(AccountDBFields(storage, version, slot, bank_hash_info))
}
}

View File

@ -0,0 +1,282 @@
#[cfg(test)]
use {
super::*,
crate::{
accounts::{create_test_accounts, Accounts},
accounts_db::get_temp_accounts_paths,
bank::{Bank, StatusCacheRc},
},
rand::{thread_rng, Rng},
solana_sdk::{
account::Account,
clock::Slot,
genesis_config::create_genesis_config,
pubkey::Pubkey,
signature::{Keypair, Signer},
},
std::io::{BufReader, Cursor},
tempfile::TempDir,
};
#[cfg(test)]
fn copy_append_vecs<P: AsRef<Path>>(
accounts_db: &AccountsDB,
output_dir: P,
) -> std::io::Result<()> {
let storage_entries = accounts_db.get_snapshot_storages(Slot::max_value());
for storage in storage_entries.iter().flatten() {
let storage_path = storage.get_path();
let output_path = output_dir.as_ref().join(
storage_path
.file_name()
.expect("Invalid AppendVec file path"),
);
std::fs::copy(storage_path, output_path)?;
}
Ok(())
}
#[cfg(test)]
fn check_accounts(accounts: &Accounts, pubkeys: &[Pubkey], num: usize) {
for _ in 1..num {
let idx = thread_rng().gen_range(0, num - 1);
let ancestors = vec![(0, 0)].into_iter().collect();
let account = accounts.load_slow(&ancestors, &pubkeys[idx]);
let account1 = Some((
Account::new((idx + 1) as u64, 0, &Account::default().owner),
0,
));
assert_eq!(account, account1);
}
}
#[cfg(test)]
fn context_accountsdb_from_stream<'a, C, R, P>(
stream: &mut BufReader<R>,
account_paths: &[PathBuf],
stream_append_vecs_path: P,
) -> Result<AccountsDB, IoError>
where
C: TypeContext<'a>,
R: Read,
P: AsRef<Path>,
{
// read and deserialise the accounts database directly from the stream
context_accountsdb_from_fields::<C, P>(
C::deserialize_accounts_db_fields(stream)?,
account_paths,
stream_append_vecs_path,
)
}
#[cfg(test)]
fn accountsdb_from_stream<R, P>(
serde_style: SerdeStyle,
stream: &mut BufReader<R>,
account_paths: &[PathBuf],
stream_append_vecs_path: P,
) -> Result<AccountsDB, IoError>
where
R: Read,
P: AsRef<Path>,
{
match serde_style {
SerdeStyle::NEWER => context_accountsdb_from_stream::<TypeContextFuture, R, P>(
stream,
account_paths,
stream_append_vecs_path,
),
SerdeStyle::OLDER => context_accountsdb_from_stream::<TypeContextLegacy, R, P>(
stream,
account_paths,
stream_append_vecs_path,
),
}
}
#[cfg(test)]
fn accountsdb_to_stream<W>(
serde_style: SerdeStyle,
stream: &mut W,
accounts_db: &AccountsDB,
slot: Slot,
account_storage_entries: &[SnapshotStorage],
) -> Result<(), IoError>
where
W: Write,
{
match serde_style {
SerdeStyle::NEWER => serialize_into(
stream,
&SerializableAccountsDB::<TypeContextFuture> {
accounts_db,
slot,
account_storage_entries,
phantom: std::marker::PhantomData::default(),
},
)
.map_err(bankrc_to_io_error),
SerdeStyle::OLDER => serialize_into(
stream,
&SerializableAccountsDB::<TypeContextLegacy> {
accounts_db,
slot,
account_storage_entries,
phantom: std::marker::PhantomData::default(),
},
)
.map_err(bankrc_to_io_error),
}
}
#[cfg(test)]
fn test_accounts_serialize_style(serde_style: SerdeStyle) {
solana_logger::setup();
let (_accounts_dir, paths) = get_temp_accounts_paths(4).unwrap();
let accounts = Accounts::new(paths);
let mut pubkeys: Vec<Pubkey> = vec![];
create_test_accounts(&accounts, &mut pubkeys, 100, 0);
check_accounts(&accounts, &pubkeys, 100);
accounts.add_root(0);
let mut writer = Cursor::new(vec![]);
accountsdb_to_stream(
serde_style,
&mut writer,
&*accounts.accounts_db,
0,
&accounts.accounts_db.get_snapshot_storages(0),
)
.unwrap();
let copied_accounts = TempDir::new().unwrap();
// Simulate obtaining a copy of the AppendVecs from a tarball
copy_append_vecs(&accounts.accounts_db, copied_accounts.path()).unwrap();
let buf = writer.into_inner();
let mut reader = BufReader::new(&buf[..]);
let (_accounts_dir, daccounts_paths) = get_temp_accounts_paths(2).unwrap();
let daccounts = Accounts::new_empty(
accountsdb_from_stream(
serde_style,
&mut reader,
&daccounts_paths,
copied_accounts.path(),
)
.unwrap(),
);
check_accounts(&daccounts, &pubkeys, 100);
assert_eq!(accounts.bank_hash_at(0), daccounts.bank_hash_at(0));
}
#[cfg(test)]
fn test_bank_serialize_style(serde_style: SerdeStyle) {
solana_logger::setup();
let (genesis_config, _) = create_genesis_config(500);
let bank0 = Arc::new(Bank::new(&genesis_config));
let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1);
bank0.squash();
// Create an account on a non-root fork
let key1 = Keypair::new();
bank1.deposit(&key1.pubkey(), 5);
let bank2 = Bank::new_from_parent(&bank0, &Pubkey::default(), 2);
// Test new account
let key2 = Keypair::new();
bank2.deposit(&key2.pubkey(), 10);
assert_eq!(bank2.get_balance(&key2.pubkey()), 10);
let key3 = Keypair::new();
bank2.deposit(&key3.pubkey(), 0);
bank2.squash();
let snapshot_storages = bank2.get_snapshot_storages();
let mut buf = vec![];
let mut writer = Cursor::new(&mut buf);
serialize_into(&mut writer, &bank2).unwrap();
crate::serde_snapshot::bankrc_to_stream(
serde_style,
&mut std::io::BufWriter::new(&mut writer),
&bank2.rc,
&snapshot_storages,
)
.unwrap();
let mut rdr = Cursor::new(&buf[..]);
let mut dbank: Bank = bincode::deserialize_from(&mut rdr).unwrap();
let mut reader = std::io::BufReader::new(&buf[rdr.position() as usize..]);
// Create a new set of directories for this bank's accounts
let (_accounts_dir, dbank_paths) = get_temp_accounts_paths(4).unwrap();
let ref_sc = StatusCacheRc::default();
ref_sc.status_cache.write().unwrap().add_root(2);
// Create a directory to simulate AppendVecs unpackaged from a snapshot tar
let copied_accounts = TempDir::new().unwrap();
copy_append_vecs(&bank2.rc.accounts.accounts_db, copied_accounts.path()).unwrap();
dbank.set_bank_rc(
crate::serde_snapshot::bankrc_from_stream(
serde_style,
&dbank_paths,
dbank.slot(),
&mut reader,
copied_accounts.path(),
)
.unwrap(),
ref_sc,
);
assert_eq!(dbank.get_balance(&key1.pubkey()), 0);
assert_eq!(dbank.get_balance(&key2.pubkey()), 10);
assert_eq!(dbank.get_balance(&key3.pubkey()), 0);
bank2.compare_bank(&dbank);
}
#[cfg(test)]
pub(crate) fn reconstruct_accounts_db_via_serialization(
accounts: &AccountsDB,
slot: Slot,
) -> AccountsDB {
let mut writer = Cursor::new(vec![]);
let snapshot_storages = accounts.get_snapshot_storages(slot);
accountsdb_to_stream(
SerdeStyle::NEWER,
&mut writer,
&accounts,
slot,
&snapshot_storages,
)
.unwrap();
let buf = writer.into_inner();
let mut reader = BufReader::new(&buf[..]);
let copied_accounts = TempDir::new().unwrap();
// Simulate obtaining a copy of the AppendVecs from a tarball
copy_append_vecs(&accounts, copied_accounts.path()).unwrap();
accountsdb_from_stream(SerdeStyle::NEWER, &mut reader, &[], copied_accounts.path()).unwrap()
}
#[test]
fn test_accounts_serialize_newer() {
test_accounts_serialize_style(SerdeStyle::NEWER)
}
#[test]
fn test_accounts_serialize_older() {
test_accounts_serialize_style(SerdeStyle::OLDER)
}
#[test]
fn test_bank_serialize_newer() {
test_bank_serialize_style(SerdeStyle::NEWER)
}
#[test]
fn test_bank_serialize_older() {
test_bank_serialize_style(SerdeStyle::OLDER)
}

View File

@ -0,0 +1,107 @@
use serde::{
ser::{SerializeSeq, SerializeTuple},
Serialize, Serializer,
};
// consumes an iterator and returns an object that will serialize as a serde seq
#[allow(dead_code)]
pub fn serialize_iter_as_seq<I>(iter: I) -> impl Serialize
where
I: IntoIterator,
<I as IntoIterator>::Item: Serialize,
<I as IntoIterator>::IntoIter: ExactSizeIterator,
{
struct SerializableSequencedIterator<I> {
iter: std::cell::RefCell<Option<I>>,
}
impl<I> Serialize for SerializableSequencedIterator<I>
where
I: IntoIterator,
<I as IntoIterator>::Item: Serialize,
<I as IntoIterator>::IntoIter: ExactSizeIterator,
{
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let iter = self.iter.borrow_mut().take().unwrap().into_iter();
let mut seq = serializer.serialize_seq(Some(iter.len()))?;
for item in iter {
seq.serialize_element(&item)?;
}
seq.end()
}
}
SerializableSequencedIterator {
iter: std::cell::RefCell::new(Some(iter)),
}
}
// consumes an iterator and returns an object that will serialize as a serde tuple
#[allow(dead_code)]
pub fn serialize_iter_as_tuple<I>(iter: I) -> impl Serialize
where
I: IntoIterator,
<I as IntoIterator>::Item: Serialize,
<I as IntoIterator>::IntoIter: ExactSizeIterator,
{
struct SerializableSequencedIterator<I> {
iter: std::cell::RefCell<Option<I>>,
}
impl<I> Serialize for SerializableSequencedIterator<I>
where
I: IntoIterator,
<I as IntoIterator>::Item: Serialize,
<I as IntoIterator>::IntoIter: ExactSizeIterator,
{
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let iter = self.iter.borrow_mut().take().unwrap().into_iter();
let mut tup = serializer.serialize_tuple(iter.len())?;
for item in iter {
tup.serialize_element(&item)?;
}
tup.end()
}
}
SerializableSequencedIterator {
iter: std::cell::RefCell::new(Some(iter)),
}
}
// consumes a 2-tuple iterator and returns an object that will serialize as a serde map
#[allow(dead_code)]
pub fn serialize_iter_as_map<K, V, I>(iter: I) -> impl Serialize
where
K: Serialize,
V: Serialize,
I: IntoIterator<Item = (K, V)>,
{
struct SerializableMappedIterator<I> {
iter: std::cell::RefCell<Option<I>>,
}
impl<K, V, I> Serialize for SerializableMappedIterator<I>
where
K: Serialize,
V: Serialize,
I: IntoIterator<Item = (K, V)>,
{
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.collect_map(self.iter.borrow_mut().take().unwrap())
}
}
SerializableMappedIterator {
iter: std::cell::RefCell::new(Some(iter)),
}
}