diff --git a/core/src/snapshot_packager_service.rs b/core/src/snapshot_packager_service.rs index a3baff791..626fdb735 100644 --- a/core/src/snapshot_packager_service.rs +++ b/core/src/snapshot_packager_service.rs @@ -83,9 +83,7 @@ mod tests { snapshot_package::AccountsPackage, snapshot_utils::{self, SNAPSHOT_STATUS_CACHE_FILE_NAME}, }; - use solana_runtime::{ - accounts_db::AccountStorageEntry, bank::BankSlotDelta, bank::MAX_SNAPSHOT_DATA_FILE_SIZE, - }; + use solana_runtime::{accounts_db::AccountStorageEntry, bank::BankSlotDelta}; use solana_sdk::hash::Hash; use std::{ fs::{self, remove_dir_all, OpenOptions}, @@ -186,7 +184,6 @@ mod tests { let dummy_slot_deltas: Vec = vec![]; snapshot_utils::serialize_snapshot_data_file( &snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILE_NAME), - MAX_SNAPSHOT_DATA_FILE_SIZE, |stream| { serialize_into(stream, &dummy_slot_deltas)?; Ok(()) diff --git a/core/tests/bank_forks.rs b/core/tests/bank_forks.rs index e4174965c..0becfcad6 100644 --- a/core/tests/bank_forks.rs +++ b/core/tests/bank_forks.rs @@ -344,7 +344,6 @@ mod tests { &saved_snapshots_dir .path() .join(snapshot_utils::SNAPSHOT_STATUS_CACHE_FILE_NAME), - solana_runtime::bank::MAX_SNAPSHOT_DATA_FILE_SIZE, |stream| { serialize_into(stream, &dummy_slot_deltas)?; Ok(()) diff --git a/ledger/src/snapshot_utils.rs b/ledger/src/snapshot_utils.rs index 1c8b8b9a3..9028f4114 100644 --- a/ledger/src/snapshot_utils.rs +++ b/ledger/src/snapshot_utils.rs @@ -9,10 +9,9 @@ use log::*; use regex::Regex; use solana_measure::measure::Measure; use solana_runtime::{ - accounts_db::{SnapshotStorage, SnapshotStorages}, - bank::{ - self, deserialize_from_snapshot, Bank, BankRcSerialize, BankSlotDelta, - MAX_SNAPSHOT_DATA_FILE_SIZE, + bank::{Bank, BankSlotDelta}, + serde_snapshot::{ + bankrc_from_stream, bankrc_to_stream, SerdeStyle, SnapshotStorage, SnapshotStorages, }, }; use solana_sdk::{clock::Slot, genesis_config::GenesisConfig, hash::Hash, pubkey::Pubkey}; @@ -22,6 +21,8 @@ use std::{ io::{BufReader, BufWriter, Error as IOError, ErrorKind, Read, Seek, SeekFrom, Write}, path::{Path, PathBuf}, process::ExitStatus, + str::FromStr, + sync::Arc, }; use tar::Archive; use tempfile::TempDir; @@ -32,7 +33,47 @@ pub const TAR_SNAPSHOTS_DIR: &str = "snapshots"; pub const TAR_ACCOUNTS_DIR: &str = "accounts"; pub const TAR_VERSION_FILE: &str = "version"; -pub const SNAPSHOT_VERSION: &str = "1.1.0"; +const MAX_SNAPSHOT_DATA_FILE_SIZE: u64 = 32 * 1024 * 1024 * 1024; // 32 GiB +const VERSION_STRING_V1_1_0: &str = "1.1.0"; +const VERSION_STRING_V1_2_0: &str = "1.2.0"; +const OUTPUT_SNAPSHOT_VERSION: SnapshotVersion = SnapshotVersion::V1_2_0; + +#[derive(Copy, Clone, Eq, PartialEq, Debug)] +pub enum SnapshotVersion { + V1_1_0, + V1_2_0, +} + +impl From for &'static str { + fn from(snapshot_version: SnapshotVersion) -> &'static str { + match snapshot_version { + SnapshotVersion::V1_1_0 => VERSION_STRING_V1_1_0, + SnapshotVersion::V1_2_0 => VERSION_STRING_V1_2_0, + } + } +} + +impl FromStr for SnapshotVersion { + type Err = &'static str; + + fn from_str(version_string: &str) -> std::result::Result { + match version_string { + VERSION_STRING_V1_1_0 => Ok(SnapshotVersion::V1_1_0), + VERSION_STRING_V1_2_0 => Ok(SnapshotVersion::V1_2_0), + _ => Err("unsupported snapshot version"), + } + } +} + +impl SnapshotVersion { + pub fn as_str(self) -> &'static str { + <&str as From>::from(self) + } + + fn maybe_from_string(version_string: &str) -> Option { + version_string.parse::().ok() + } +} #[derive(PartialEq, Ord, Eq, Debug)] pub struct SlotSnapshotPaths { @@ -192,7 +233,7 @@ pub fn archive_snapshot_package(snapshot_package: &AccountsPackage) -> Result<() // Write version file { let mut f = std::fs::File::create(staging_version_file)?; - f.write_all(&SNAPSHOT_VERSION.to_string().into_bytes())?; + f.write_all(OUTPUT_SNAPSHOT_VERSION.as_str().as_bytes())?; } let (compression_option, file_ext) = get_compression_ext(&snapshot_package.compression); @@ -289,13 +330,35 @@ where } } -pub fn serialize_snapshot_data_file( +pub fn serialize_snapshot_data_file(data_file_path: &Path, serializer: F) -> Result +where + F: FnOnce(&mut BufWriter) -> Result<()>, +{ + serialize_snapshot_data_file_capped::( + data_file_path, + MAX_SNAPSHOT_DATA_FILE_SIZE, + serializer, + ) +} + +pub fn deserialize_snapshot_data_file(data_file_path: &Path, deserializer: F) -> Result +where + F: FnOnce(&mut BufReader) -> Result, +{ + deserialize_snapshot_data_file_capped::( + data_file_path, + MAX_SNAPSHOT_DATA_FILE_SIZE, + deserializer, + ) +} + +fn serialize_snapshot_data_file_capped( data_file_path: &Path, maximum_file_size: u64, - mut serializer: F, + serializer: F, ) -> Result where - F: FnMut(&mut BufWriter) -> Result<()>, + F: FnOnce(&mut BufWriter) -> Result<()>, { let data_file = File::create(data_file_path)?; let mut data_file_stream = BufWriter::new(data_file); @@ -313,13 +376,13 @@ where Ok(consumed_size) } -pub fn deserialize_snapshot_data_file( +fn deserialize_snapshot_data_file_capped( data_file_path: &Path, maximum_file_size: u64, - mut deserializer: F, + deserializer: F, ) -> Result where - F: FnMut(&mut BufReader) -> Result, + F: FnOnce(&mut BufReader) -> Result, { let file_size = fs::metadata(&data_file_path)?.len(); @@ -367,21 +430,18 @@ pub fn add_snapshot>( ); let mut bank_serialize = Measure::start("bank-serialize-ms"); - let consumed_size = serialize_snapshot_data_file( - &snapshot_bank_file_path, - MAX_SNAPSHOT_DATA_FILE_SIZE, - |stream| { - serialize_into(stream.by_ref(), &*bank)?; - serialize_into( - stream.by_ref(), - &BankRcSerialize { - bank_rc: &bank.rc, - snapshot_storages, - }, - )?; - Ok(()) - }, - )?; + let bank_snapshot_serializer = move |stream: &mut BufWriter| -> Result<()> { + serialize_into(stream.by_ref(), bank)?; + bankrc_to_stream( + SerdeStyle::NEWER, + stream.by_ref(), + &bank.rc, + snapshot_storages, + )?; + Ok(()) + }; + let consumed_size = + serialize_snapshot_data_file(&snapshot_bank_file_path, bank_snapshot_serializer)?; bank_serialize.stop(); // Monitor sizes because they're capped to MAX_SNAPSHOT_DATA_FILE_SIZE @@ -414,14 +474,10 @@ pub fn serialize_status_cache( snapshot_links.path().join(SNAPSHOT_STATUS_CACHE_FILE_NAME); let mut status_cache_serialize = Measure::start("status_cache_serialize-ms"); - let consumed_size = serialize_snapshot_data_file( - &snapshot_status_cache_file_path, - MAX_SNAPSHOT_DATA_FILE_SIZE, - |stream| { - serialize_into(stream, slot_deltas)?; - Ok(()) - }, - )?; + let consumed_size = serialize_snapshot_data_file(&snapshot_status_cache_file_path, |stream| { + serialize_into(stream, slot_deltas)?; + Ok(()) + })?; status_cache_serialize.stop(); // Monitor sizes because they're capped to MAX_SNAPSHOT_DATA_FILE_SIZE @@ -624,6 +680,13 @@ where { info!("snapshot version: {}", snapshot_version); + let snapshot_version_enum = + SnapshotVersion::maybe_from_string(snapshot_version).ok_or_else(|| { + get_io_error(&format!( + "unsupported snapshot version: {}", + snapshot_version + )) + })?; let mut snapshot_paths = get_snapshot_paths(&unpacked_snapshots_dir); if snapshot_paths.len() > 1 { return Err(get_io_error("invalid snapshot format")); @@ -633,45 +696,47 @@ where .ok_or_else(|| get_io_error("No snapshots found in snapshots directory"))?; info!("Loading bank from {:?}", &root_paths.snapshot_file_path); - let bank = deserialize_snapshot_data_file( - &root_paths.snapshot_file_path, - MAX_SNAPSHOT_DATA_FILE_SIZE, - |stream| { - let mut bank: Bank = match snapshot_version { - SNAPSHOT_VERSION => deserialize_from_snapshot(stream.by_ref())?, - _ => { - return Err(get_io_error(&format!( - "unsupported snapshot version: {}", - snapshot_version - ))); - } - }; - bank.operating_mode = Some(genesis_config.operating_mode); - info!("Rebuilding accounts..."); - let rc = bank::BankRc::from_stream( + let bank = deserialize_snapshot_data_file(&root_paths.snapshot_file_path, |mut stream| { + let mut bank: Bank = bincode::config() + .limit(MAX_SNAPSHOT_DATA_FILE_SIZE) + .deserialize_from(&mut stream)?; + + info!("Rebuilding accounts..."); + + let mut bankrc = match snapshot_version_enum { + SnapshotVersion::V1_1_0 => bankrc_from_stream( + SerdeStyle::OLDER, account_paths, bank.slot(), - &bank.ancestors, - frozen_account_pubkeys, - stream.by_ref(), + &mut stream, &append_vecs_path, - )?; - bank.set_bank_rc(rc, bank::StatusCacheRc::default()); - bank.finish_init(); - Ok(bank) - }, - )?; + ), + SnapshotVersion::V1_2_0 => bankrc_from_stream( + SerdeStyle::NEWER, + account_paths, + bank.slot(), + &mut stream, + &append_vecs_path, + ), + }?; + Arc::get_mut(&mut Arc::get_mut(&mut bankrc.accounts).unwrap().accounts_db) + .unwrap() + .freeze_accounts(&bank.ancestors, frozen_account_pubkeys); + + bank.rc = bankrc; + bank.operating_mode = Some(genesis_config.operating_mode); + bank.finish_init(); + Ok(bank) + })?; let status_cache_path = unpacked_snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILE_NAME); - let slot_deltas = deserialize_snapshot_data_file( - &status_cache_path, - MAX_SNAPSHOT_DATA_FILE_SIZE, - |stream| { - info!("Rebuilding status cache..."); - let slot_deltas: Vec = deserialize_from_snapshot(stream)?; - Ok(slot_deltas) - }, - )?; + let slot_deltas = deserialize_snapshot_data_file(&status_cache_path, |stream| { + info!("Rebuilding status cache..."); + let slot_deltas: Vec = bincode::config() + .limit(MAX_SNAPSHOT_DATA_FILE_SIZE) + .deserialize_from(stream)?; + Ok(slot_deltas) + })?; bank.src.append(&slot_deltas); @@ -726,7 +791,7 @@ mod tests { fn test_serialize_snapshot_data_file_under_limit() { let temp_dir = tempfile::TempDir::new().unwrap(); let expected_consumed_size = size_of::() as u64; - let consumed_size = serialize_snapshot_data_file( + let consumed_size = serialize_snapshot_data_file_capped( &temp_dir.path().join("data-file"), expected_consumed_size, |stream| { @@ -742,7 +807,7 @@ mod tests { fn test_serialize_snapshot_data_file_over_limit() { let temp_dir = tempfile::TempDir::new().unwrap(); let expected_consumed_size = size_of::() as u64; - let result = serialize_snapshot_data_file( + let result = serialize_snapshot_data_file_capped( &temp_dir.path().join("data-file"), expected_consumed_size - 1, |stream| { @@ -759,7 +824,7 @@ mod tests { let expected_consumed_size = size_of::() as u64; let temp_dir = tempfile::TempDir::new().unwrap(); - serialize_snapshot_data_file( + serialize_snapshot_data_file_capped( &temp_dir.path().join("data-file"), expected_consumed_size, |stream| { @@ -769,7 +834,7 @@ mod tests { ) .unwrap(); - let actual_data = deserialize_snapshot_data_file( + let actual_data = deserialize_snapshot_data_file_capped( &temp_dir.path().join("data-file"), expected_consumed_size, |stream| Ok(deserialize_from::<_, u32>(stream)?), @@ -784,7 +849,7 @@ mod tests { let expected_consumed_size = size_of::() as u64; let temp_dir = tempfile::TempDir::new().unwrap(); - serialize_snapshot_data_file( + serialize_snapshot_data_file_capped( &temp_dir.path().join("data-file"), expected_consumed_size, |stream| { @@ -794,7 +859,7 @@ mod tests { ) .unwrap(); - let result = deserialize_snapshot_data_file( + let result = deserialize_snapshot_data_file_capped( &temp_dir.path().join("data-file"), expected_consumed_size - 1, |stream| Ok(deserialize_from::<_, u32>(stream)?), @@ -808,7 +873,7 @@ mod tests { let expected_consumed_size = size_of::() as u64; let temp_dir = tempfile::TempDir::new().unwrap(); - serialize_snapshot_data_file( + serialize_snapshot_data_file_capped( &temp_dir.path().join("data-file"), expected_consumed_size * 2, |stream| { @@ -819,7 +884,7 @@ mod tests { ) .unwrap(); - let result = deserialize_snapshot_data_file( + let result = deserialize_snapshot_data_file_capped( &temp_dir.path().join("data-file"), expected_consumed_size * 2, |stream| Ok(deserialize_from::<_, u32>(stream)?), diff --git a/programs/bpf/tests/programs.rs b/programs/bpf/tests/programs.rs index dd59723a4..ae504a676 100644 --- a/programs/bpf/tests/programs.rs +++ b/programs/bpf/tests/programs.rs @@ -374,7 +374,12 @@ mod bpf { assert!(bank_client .send_message( - &[&mint_keypair, &argument_keypair, &invoked_argument_keypair, &from_keypair], + &[ + &mint_keypair, + &argument_keypair, + &invoked_argument_keypair, + &from_keypair + ], message, ) .is_ok()); diff --git a/runtime/src/accounts.rs b/runtime/src/accounts.rs index 95e83dc5a..9e7c567a3 100644 --- a/runtime/src/accounts.rs +++ b/runtime/src/accounts.rs @@ -26,14 +26,13 @@ use solana_sdk::{ }; use std::{ collections::{HashMap, HashSet}, - io::{BufReader, Error as IOError, Read}, ops::RangeBounds, - path::{Path, PathBuf}, + path::PathBuf, sync::{Arc, Mutex, RwLock}, }; #[derive(Default, Debug)] -struct ReadonlyLock { +pub(crate) struct ReadonlyLock { lock_count: Mutex, } @@ -47,10 +46,10 @@ pub struct Accounts { pub accounts_db: Arc, /// set of writable accounts which are currently in the pipeline - account_locks: Mutex>, + pub(crate) account_locks: Mutex>, /// Set of read-only accounts which are currently in the pipeline, caching number of locks. - readonly_locks: Arc>>>, + pub(crate) readonly_locks: Arc>>>, } // for the load instructions @@ -66,6 +65,15 @@ pub enum AccountAddressFilter { } impl Accounts { + pub(crate) fn new_empty(accounts_db: AccountsDB) -> Self { + Self { + accounts_db: Arc::new(accounts_db), + account_locks: Mutex::new(HashSet::new()), + readonly_locks: Arc::new(RwLock::new(Some(HashMap::new()))), + ..Self::default() + } + } + pub fn new(paths: Vec) -> Self { Self::new_with_frozen_accounts(paths, &HashMap::default(), &[]) } @@ -102,25 +110,6 @@ impl Accounts { .freeze_accounts(ancestors, frozen_account_pubkeys); } - pub fn from_stream>( - account_paths: &[PathBuf], - ancestors: &Ancestors, - frozen_account_pubkeys: &[Pubkey], - stream: &mut BufReader, - stream_append_vecs_path: P, - ) -> std::result::Result { - let mut accounts_db = AccountsDB::new(account_paths.to_vec()); - accounts_db.accounts_from_stream(stream, stream_append_vecs_path)?; - accounts_db.freeze_accounts(ancestors, frozen_account_pubkeys); - - Ok(Accounts { - slot: 0, - accounts_db: Arc::new(accounts_db), - account_locks: Mutex::new(HashSet::new()), - readonly_locks: Arc::new(RwLock::new(Some(HashMap::new()))), - }) - } - /// Return true if the slice has any duplicate elements pub fn has_duplicates(xs: &[T]) -> bool { // Note: This is an O(n^2) algorithm, but requires no heap allocations. The benchmark @@ -787,16 +776,7 @@ mod tests { // TODO: all the bank tests are bank specific, issue: 2194 use super::*; - use crate::{ - accounts_db::{ - tests::copy_append_vecs, - {get_temp_accounts_paths, AccountsDBSerialize}, - }, - bank::HashAgeKind, - rent_collector::RentCollector, - }; - use bincode::serialize_into; - use rand::{thread_rng, Rng}; + use crate::{bank::HashAgeKind, rent_collector::RentCollector}; use solana_sdk::{ account::Account, epoch_schedule::EpochSchedule, @@ -811,11 +791,9 @@ mod tests { transaction::Transaction, }; use std::{ - io::Cursor, sync::atomic::{AtomicBool, AtomicU64, Ordering}, {thread, time}, }; - use tempfile::TempDir; fn load_accounts_with_fee_and_rent( tx: Transaction, @@ -1472,61 +1450,6 @@ mod tests { accounts.bank_hash_at(1); } - fn check_accounts(accounts: &Accounts, pubkeys: &[Pubkey], num: usize) { - for _ in 1..num { - let idx = thread_rng().gen_range(0, num - 1); - let ancestors = vec![(0, 0)].into_iter().collect(); - let account = accounts.load_slow(&ancestors, &pubkeys[idx]); - let account1 = Some(( - Account::new((idx + 1) as u64, 0, &Account::default().owner), - 0, - )); - assert_eq!(account, account1); - } - } - - #[test] - fn test_accounts_serialize() { - solana_logger::setup(); - let (_accounts_dir, paths) = get_temp_accounts_paths(4).unwrap(); - let accounts = Accounts::new(paths); - - let mut pubkeys: Vec = vec![]; - create_test_accounts(&accounts, &mut pubkeys, 100, 0); - check_accounts(&accounts, &pubkeys, 100); - accounts.add_root(0); - - let mut writer = Cursor::new(vec![]); - serialize_into( - &mut writer, - &AccountsDBSerialize::new( - &*accounts.accounts_db, - 0, - &accounts.accounts_db.get_snapshot_storages(0), - ), - ) - .unwrap(); - - let copied_accounts = TempDir::new().unwrap(); - - // Simulate obtaining a copy of the AppendVecs from a tarball - copy_append_vecs(&accounts.accounts_db, copied_accounts.path()).unwrap(); - - let buf = writer.into_inner(); - let mut reader = BufReader::new(&buf[..]); - let (_accounts_dir, daccounts_paths) = get_temp_accounts_paths(2).unwrap(); - let daccounts = Accounts::from_stream( - &daccounts_paths, - &HashMap::default(), - &[], - &mut reader, - copied_accounts.path(), - ) - .unwrap(); - check_accounts(&daccounts, &pubkeys, 100); - assert_eq!(accounts.bank_hash_at(0), daccounts.bank_hash_at(0)); - } - #[test] fn test_accounts_locks() { let keypair0 = Keypair::new(); diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 697212c6c..63f3ec307 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -21,20 +21,13 @@ use crate::{ accounts_index::{AccountsIndex, Ancestors, SlotList, SlotSlice}, append_vec::{AppendVec, StoredAccount, StoredMeta}, - bank::deserialize_from_snapshot, }; -use bincode::{deserialize_from, serialize_into}; use byteorder::{ByteOrder, LittleEndian}; -use fs_extra::dir::CopyOptions; use lazy_static::lazy_static; use log::*; use rand::{thread_rng, Rng}; use rayon::{prelude::*, ThreadPool}; -use serde::{ - de::{MapAccess, Visitor}, - ser::{SerializeMap, Serializer}, - Deserialize, Serialize, -}; +use serde::{Deserialize, Serialize}; use solana_measure::measure::Measure; use solana_rayon_threadlimit::get_thread_count; use solana_sdk::{ @@ -45,8 +38,7 @@ use solana_sdk::{ }; use std::{ collections::{HashMap, HashSet}, - fmt, - io::{BufReader, Cursor, Error as IOError, ErrorKind, Read, Result as IOResult}, + io::{Error as IOError, Result as IOResult}, ops::RangeBounds, path::{Path, PathBuf}, sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}, @@ -100,35 +92,7 @@ pub type SnapshotStorage = Vec>; pub type SnapshotStorages = Vec; // Each slot has a set of storage entries. -type SlotStores = HashMap>; - -struct AccountStorageVisitor; - -impl<'de> Visitor<'de> for AccountStorageVisitor { - type Value = AccountStorage; - - fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { - formatter.write_str("Expecting AccountStorage") - } - - #[allow(clippy::mutex_atomic)] - fn visit_map(self, mut access: M) -> Result - where - M: MapAccess<'de>, - { - let mut map = HashMap::new(); - while let Some((slot, storage_entries)) = access.next_entry()? { - let storage_entries: Vec = storage_entries; - let storage_slot_map = map.entry(slot).or_insert_with(HashMap::new); - for mut storage in storage_entries { - storage.slot = slot; - storage_slot_map.insert(storage.id, Arc::new(storage)); - } - } - - Ok(AccountStorage(map)) - } -} +pub(crate) type SlotStores = HashMap>; trait Versioned { fn version(&self) -> u64; @@ -146,32 +110,6 @@ impl Versioned for (u64, AccountInfo) { } } -struct AccountStorageSerialize<'a> { - account_storage_entries: &'a [SnapshotStorage], -} - -impl<'a> Serialize for AccountStorageSerialize<'a> { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - let mut map = serializer.serialize_map(Some(self.account_storage_entries.len()))?; - let mut count = 0; - let mut serialize_account_storage_timer = Measure::start("serialize_account_storage_ms"); - for storage_entries in self.account_storage_entries { - map.serialize_entry(&storage_entries.first().unwrap().slot, storage_entries)?; - count += storage_entries.len(); - } - serialize_account_storage_timer.stop(); - datapoint_info!( - "serialize_account_storage_ms", - ("duration", serialize_account_storage_timer.as_ms(), i64), - ("num_entries", count, i64), - ); - map.end() - } -} - #[derive(Clone, Default, Debug)] pub struct AccountStorage(pub HashMap); @@ -193,22 +131,19 @@ impl AccountStorage { } } -impl<'de> Deserialize<'de> for AccountStorage { - fn deserialize(deserializer: D) -> Result - where - D: serde::Deserializer<'de>, - { - deserializer.deserialize_map(AccountStorageVisitor) - } -} - -#[derive(Debug, PartialEq, Copy, Clone, Deserialize, Serialize)] +#[derive(Debug, Eq, PartialEq, Copy, Clone, Deserialize, Serialize)] pub enum AccountStorageStatus { Available = 0, Full = 1, Candidate = 2, } +impl Default for AccountStorageStatus { + fn default() -> Self { + Self::Available + } +} + #[derive(Debug)] pub enum BankHashVerificationError { MismatchedAccountHash, @@ -217,15 +152,14 @@ pub enum BankHashVerificationError { } /// Persistent storage structure holding the accounts -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug)] pub struct AccountStorageEntry { - id: AppendVecId, + pub(crate) id: AppendVecId, - #[serde(skip)] - slot: Slot, + pub(crate) slot: Slot, /// storage holding the accounts - accounts: AppendVec, + pub(crate) accounts: AppendVec, /// Keeps track of the number of accounts stored in a specific AppendVec. /// This is periodically checked to reuse the stores that do not have @@ -235,13 +169,24 @@ pub struct AccountStorageEntry { count_and_status: RwLock<(usize, AccountStorageStatus)>, } +impl Default for AccountStorageEntry { + fn default() -> Self { + Self { + id: 0, + slot: 0, + accounts: AppendVec::new_empty_map(0), + count_and_status: RwLock::new((0, AccountStorageStatus::Available)), + } + } +} + impl AccountStorageEntry { pub fn new(path: &Path, slot: Slot, id: usize, file_size: u64) -> Self { let tail = AppendVec::new_relative_path(slot, id); let path = Path::new(path).join(&tail); let accounts = AppendVec::new(&path, true, file_size as usize); - AccountStorageEntry { + Self { id, slot, accounts, @@ -249,6 +194,15 @@ impl AccountStorageEntry { } } + pub(crate) fn new_empty_map(id: AppendVecId, accounts_current_len: usize) -> Self { + Self { + id, + slot: 0, + accounts: AppendVec::new_empty_map(accounts_current_len), + count_and_status: RwLock::new((0, AccountStorageStatus::Available)), + } + } + pub fn set_status(&self, mut status: AccountStorageStatus) { let mut count_and_status = self.count_and_status.write().unwrap(); @@ -365,55 +319,6 @@ pub fn get_temp_accounts_paths(count: u32) -> IOResult<(Vec, Vec { - accounts_db: &'a AccountsDB, - slot: Slot, - account_storage_entries: &'b [SnapshotStorage], -} - -impl<'a, 'b> AccountsDBSerialize<'a, 'b> { - pub fn new( - accounts_db: &'a AccountsDB, - slot: Slot, - account_storage_entries: &'b [SnapshotStorage], - ) -> Self { - Self { - accounts_db, - slot, - account_storage_entries, - } - } -} - -impl<'a, 'b> Serialize for AccountsDBSerialize<'a, 'b> { - fn serialize(&self, serializer: S) -> std::result::Result - where - S: serde::ser::Serializer, - { - use serde::ser::Error; - let mut wr = Cursor::new(vec![]); - let version = self.accounts_db.write_version.load(Ordering::Relaxed); - let account_storage_serialize = AccountStorageSerialize { - account_storage_entries: self.account_storage_entries, - }; - serialize_into(&mut wr, &account_storage_serialize).map_err(Error::custom)?; - serialize_into(&mut wr, &version).map_err(Error::custom)?; - let bank_hashes = self.accounts_db.bank_hashes.read().unwrap(); - serialize_into( - &mut wr, - &( - self.slot, - &*bank_hashes - .get(&self.slot) - .unwrap_or_else(|| panic!("No bank_hashes entry for slot {}", self.slot)), - ), - ) - .map_err(Error::custom)?; - let len = wr.position() as usize; - serializer.serialize_bytes(&wr.into_inner()[..len]) - } -} - #[derive(Clone, Default, Debug, Serialize, Deserialize, PartialEq)] pub struct BankHashStats { pub num_updated_accounts: u64, @@ -473,10 +378,10 @@ pub struct AccountsDB { pub next_id: AtomicUsize, pub shrink_candidate_slots: Mutex>, - write_version: AtomicU64, + pub(crate) write_version: AtomicU64, /// Set of storage paths to pick from - paths: Vec, + pub(crate) paths: Vec, /// Directory of paths this accounts_db needs to hold/remove temp_paths: Option>, @@ -592,102 +497,6 @@ impl AccountsDB { } } - pub fn accounts_from_stream>( - &self, - mut stream: &mut BufReader, - stream_append_vecs_path: P, - ) -> Result<(), IOError> { - let _len: usize = - deserialize_from(&mut stream).map_err(|e| AccountsDB::get_io_error(&e.to_string()))?; - let storage: AccountStorage = deserialize_from_snapshot(&mut stream) - .map_err(|e| AccountsDB::get_io_error(&e.to_string()))?; - - // Remap the deserialized AppendVec paths to point to correct local paths - let new_storage_map: Result, IOError> = storage - .0 - .into_iter() - .map(|(slot, mut slot_storage)| { - let mut new_slot_storage = HashMap::new(); - for (id, storage_entry) in slot_storage.drain() { - let path_index = thread_rng().gen_range(0, self.paths.len()); - let local_dir = &self.paths[path_index]; - - std::fs::create_dir_all(local_dir).expect("Create directory failed"); - - // Move the corresponding AppendVec from the snapshot into the directory pointed - // at by `local_dir` - let append_vec_relative_path = - AppendVec::new_relative_path(slot, storage_entry.id); - let append_vec_abs_path = stream_append_vecs_path - .as_ref() - .join(&append_vec_relative_path); - let target = local_dir.join(append_vec_abs_path.file_name().unwrap()); - if std::fs::rename(append_vec_abs_path.clone(), target).is_err() { - let mut copy_options = CopyOptions::new(); - copy_options.overwrite = true; - let e = fs_extra::move_items( - &vec![&append_vec_abs_path], - &local_dir, - ©_options, - ) - .map_err(|e| { - AccountsDB::get_io_error(&format!( - "Unable to move {:?} to {:?}: {}", - append_vec_abs_path, local_dir, e - )) - }); - if e.is_err() { - info!("{:?}", e); - continue; - } - }; - - // Notify the AppendVec of the new file location - let local_path = local_dir.join(append_vec_relative_path); - let mut u_storage_entry = Arc::try_unwrap(storage_entry).unwrap(); - u_storage_entry - .set_file(local_path) - .map_err(|e| AccountsDB::get_io_error(&e.to_string()))?; - new_slot_storage.insert(id, Arc::new(u_storage_entry)); - } - Ok((slot, new_slot_storage)) - }) - .collect(); - - let new_storage_map = new_storage_map?; - let mut storage = AccountStorage(new_storage_map); - - // discard any slots with no storage entries - // this can happen if a non-root slot was serialized - // but non-root stores should not be included in the snapshot - storage.0.retain(|_slot, stores| !stores.is_empty()); - - let version: u64 = deserialize_from(&mut stream) - .map_err(|_| AccountsDB::get_io_error("write version deserialize error"))?; - - let (slot, bank_hash): (Slot, BankHashInfo) = deserialize_from(&mut stream) - .map_err(|_| AccountsDB::get_io_error("bank hashes deserialize error"))?; - self.bank_hashes.write().unwrap().insert(slot, bank_hash); - - // Process deserialized data, set necessary fields in self - let max_id: usize = *storage - .0 - .values() - .flat_map(HashMap::keys) - .max() - .expect("At least one storage entry must exist from deserializing stream"); - - { - let mut stores = self.storage.write().unwrap(); - stores.0.extend(storage.0); - } - - self.next_id.store(max_id + 1, Ordering::Relaxed); - self.write_version.fetch_add(version, Ordering::Relaxed); - self.generate_index(); - Ok(()) - } - fn new_storage_entry(&self, slot: Slot, path: &Path, size: u64) -> AccountStorageEntry { AccountStorageEntry::new( path, @@ -1635,7 +1444,7 @@ impl AccountsDB { let accounts_index = self.accounts_index.read().unwrap(); let storage = self.storage.read().unwrap(); let keys: Vec<_> = accounts_index.account_maps.keys().collect(); - let mismatch_found = AtomicBool::new(false); + let mismatch_found = AtomicU64::new(0); let hashes: Vec<_> = keys .par_iter() .filter_map(|pubkey| { @@ -1652,9 +1461,7 @@ impl AccountsDB { if check_hash { let hash = Self::hash_stored_account(*slot, &account); if hash != *account.hash { - mismatch_found.store(true, Ordering::Relaxed); - } - if mismatch_found.load(Ordering::Relaxed) { + mismatch_found.fetch_add(1, Ordering::Relaxed); return None; } } @@ -1669,7 +1476,11 @@ impl AccountsDB { } }) .collect(); - if mismatch_found.load(Ordering::Relaxed) { + if mismatch_found.load(Ordering::Relaxed) > 0 { + warn!( + "{} mismatched account hash(es) found", + mismatch_found.load(Ordering::Relaxed) + ); return Err(MismatchedAccountHash); } @@ -2000,12 +1811,7 @@ impl AccountsDB { } } - fn get_io_error(error: &str) -> IOError { - warn!("AccountsDB error: {:?}", error); - IOError::new(ErrorKind::Other, error) - } - - fn generate_index(&self) { + pub fn generate_index(&self) { let storage = self.storage.read().unwrap(); let mut slots: Vec = storage.0.keys().cloned().collect(); slots.sort(); @@ -2085,14 +1891,11 @@ impl AccountsDB { pub mod tests { // TODO: all the bank tests are bank specific, issue: 2194 use super::*; - use crate::accounts_index::RefCount; - use crate::append_vec::AccountMeta; + use crate::{accounts_index::RefCount, append_vec::AccountMeta}; use assert_matches::assert_matches; - use bincode::serialize_into; use rand::{thread_rng, Rng}; use solana_sdk::{account::Account, hash::HASH_BYTES}; use std::{fs, str::FromStr}; - use tempfile::TempDir; fn linear_ancestors(end_slot: u64) -> Ancestors { let mut ancestors: Ancestors = vec![(0, 0)].into_iter().collect(); @@ -2954,26 +2757,9 @@ pub mod tests { } fn reconstruct_accounts_db_via_serialization(accounts: &AccountsDB, slot: Slot) -> AccountsDB { - let mut writer = Cursor::new(vec![]); - let snapshot_storages = accounts.get_snapshot_storages(slot); - serialize_into( - &mut writer, - &AccountsDBSerialize::new(&accounts, slot, &snapshot_storages), - ) - .unwrap(); - - let buf = writer.into_inner(); - let mut reader = BufReader::new(&buf[..]); - let daccounts = AccountsDB::new(Vec::new()); - let copied_accounts = TempDir::new().unwrap(); - // Simulate obtaining a copy of the AppendVecs from a tarball - copy_append_vecs(&accounts, copied_accounts.path()).unwrap(); - daccounts - .accounts_from_stream(&mut reader, copied_accounts.path()) - .unwrap(); - + let daccounts = + crate::serde_snapshot::reconstruct_accounts_db_via_serialization(accounts, slot); print_count_and_status("daccounts", &daccounts); - daccounts } diff --git a/runtime/src/append_vec.rs b/runtime/src/append_vec.rs index cf722da67..283c21be1 100644 --- a/runtime/src/append_vec.rs +++ b/runtime/src/append_vec.rs @@ -1,4 +1,3 @@ -use bincode::{deserialize_from, serialize_into}; use memmap::MmapMut; use serde::{Deserialize, Serialize}; use solana_sdk::{ @@ -8,10 +7,9 @@ use solana_sdk::{ pubkey::Pubkey, }; use std::{ - fmt, fs::{remove_file, OpenOptions}, io, - io::{Cursor, Seek, SeekFrom, Write}, + io::{Seek, SeekFrom, Write}, mem, path::{Path, PathBuf}, sync::atomic::{AtomicUsize, Ordering}, @@ -175,7 +173,7 @@ impl AppendVec { } #[allow(clippy::mutex_atomic)] - fn new_empty_map(current_len: usize) -> Self { + pub(crate) fn new_empty_map(current_len: usize) -> Self { let map = MmapMut::map_anon(1).expect("failed to map the data file"); AppendVec { @@ -480,54 +478,6 @@ pub mod test_utils { } } -#[allow(clippy::mutex_atomic)] -impl Serialize for AppendVec { - fn serialize(&self, serializer: S) -> std::result::Result - where - S: serde::ser::Serializer, - { - use serde::ser::Error; - let len = std::mem::size_of::(); - let mut buf = vec![0u8; len]; - let mut wr = Cursor::new(&mut buf[..]); - serialize_into(&mut wr, &(self.current_len.load(Ordering::Relaxed) as u64)) - .map_err(Error::custom)?; - let len = wr.position() as usize; - serializer.serialize_bytes(&wr.into_inner()[..len]) - } -} - -struct AppendVecVisitor; - -impl<'a> serde::de::Visitor<'a> for AppendVecVisitor { - type Value = AppendVec; - - fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { - formatter.write_str("Expecting AppendVec") - } - - fn visit_bytes(self, data: &[u8]) -> std::result::Result - where - E: serde::de::Error, - { - use serde::de::Error; - let mut rd = Cursor::new(&data[..]); - let current_len: usize = deserialize_from(&mut rd).map_err(Error::custom)?; - // Note this does not initialize a valid Mmap in the AppendVec, needs to be done - // externally - Ok(AppendVec::new_empty_map(current_len)) - } -} - -impl<'de> Deserialize<'de> for AppendVec { - fn deserialize(deserializer: D) -> std::result::Result - where - D: ::serde::Deserializer<'de>, - { - deserializer.deserialize_bytes(AppendVecVisitor) - } -} - #[cfg(test)] pub mod tests { use super::test_utils::*; diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index acc6b6132..0ba7695f6 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -7,7 +7,7 @@ use crate::{ AccountAddressFilter, Accounts, TransactionAccounts, TransactionLoadResult, TransactionLoaders, }, - accounts_db::{AccountsDBSerialize, ErrorCounters, SnapshotStorage, SnapshotStorages}, + accounts_db::{ErrorCounters, SnapshotStorages}, accounts_index::Ancestors, blockhash_queue::BlockhashQueue, builtin_programs::get_builtin_programs, @@ -21,7 +21,6 @@ use crate::{ transaction_batch::TransactionBatch, transaction_utils::OrderedIterator, }; -use bincode::{deserialize_from, serialize_into}; use byteorder::{ByteOrder, LittleEndian}; use itertools::Itertools; use log::*; @@ -59,17 +58,15 @@ use solana_vote_program::vote_state::VoteState; use std::{ cell::RefCell, collections::{HashMap, HashSet}, - io::{BufReader, Cursor, Error as IOError, Read}, mem, ops::RangeInclusive, - path::{Path, PathBuf}, + path::PathBuf, rc::Rc, sync::atomic::{AtomicBool, AtomicU64, Ordering}, sync::{Arc, RwLock, RwLockReadGuard}, }; pub const SECONDS_PER_YEAR: f64 = 365.25 * 24.0 * 60.0 * 60.0; -pub const MAX_SNAPSHOT_DATA_FILE_SIZE: u64 = 32 * 1024 * 1024 * 1024; // 32 GiB pub const MAX_LEADER_SCHEDULE_STAKES: Epoch = 5; @@ -98,73 +95,27 @@ type EpochCount = u64; #[derive(Default)] pub struct BankRc { /// where all the Accounts are stored - accounts: Arc, + pub accounts: Arc, /// Previous checkpoint of this bank - parent: RwLock>>, + pub(crate) parent: RwLock>>, /// Current slot - slot: Slot, + pub(crate) slot: Slot, } impl BankRc { - pub fn from_stream>( - account_paths: &[PathBuf], - slot: Slot, - ancestors: &Ancestors, - frozen_account_pubkeys: &[Pubkey], - mut stream: &mut BufReader, - stream_append_vecs_path: P, - ) -> std::result::Result { - let _len: usize = - deserialize_from(&mut stream).map_err(|e| BankRc::get_io_error(&e.to_string()))?; - - let accounts = Accounts::from_stream( - account_paths, - ancestors, - frozen_account_pubkeys, - stream, - stream_append_vecs_path, - )?; - - Ok(BankRc { + pub(crate) fn new(accounts: Accounts, slot: Slot) -> Self { + Self { accounts: Arc::new(accounts), parent: RwLock::new(None), slot, - }) + } } pub fn get_snapshot_storages(&self, slot: Slot) -> SnapshotStorages { self.accounts.accounts_db.get_snapshot_storages(slot) } - - fn get_io_error(error: &str) -> IOError { - warn!("BankRc error: {:?}", error); - std::io::Error::new(std::io::ErrorKind::Other, error) - } -} - -pub struct BankRcSerialize<'a, 'b> { - pub bank_rc: &'a BankRc, - pub snapshot_storages: &'b [SnapshotStorage], -} - -impl<'a, 'b> Serialize for BankRcSerialize<'a, 'b> { - fn serialize(&self, serializer: S) -> std::result::Result - where - S: serde::ser::Serializer, - { - use serde::ser::Error; - let mut wr = Cursor::new(Vec::new()); - let accounts_db_serialize = AccountsDBSerialize::new( - &*self.bank_rc.accounts.accounts_db, - self.bank_rc.slot, - self.snapshot_storages, - ); - serialize_into(&mut wr, &accounts_db_serialize).map_err(Error::custom)?; - let len = wr.position() as usize; - serializer.serialize_bytes(&wr.into_inner()[..len]) - } } #[derive(Default)] @@ -2586,31 +2537,16 @@ pub fn goto_end_of_slot(bank: &mut Bank) { } } -// This guards against possible memory exhaustions in bincode when restoring -// the full state from snapshot data files by imposing a fixed hard limit with -// ample of headrooms for such a usecase. -pub fn deserialize_from_snapshot(reader: R) -> bincode::Result -where - R: Read, - T: serde::de::DeserializeOwned, -{ - bincode::config() - .limit(MAX_SNAPSHOT_DATA_FILE_SIZE) - .deserialize_from(reader) -} - #[cfg(test)] mod tests { use super::*; use crate::{ - accounts_db::{get_temp_accounts_paths, tests::copy_append_vecs}, accounts_index::Ancestors, genesis_utils::{ create_genesis_config_with_leader, GenesisConfigInfo, BOOTSTRAP_VALIDATOR_LAMPORTS, }, status_cache::MAX_CACHE_ENTRIES, }; - use bincode::{serialize_into, serialized_size}; use solana_sdk::{ account::KeyedAccount, account_utils::StateMut, @@ -2636,8 +2572,7 @@ mod tests { vote_instruction, vote_state::{self, Vote, VoteInit, VoteState, MAX_LOCKOUT_HISTORY}, }; - use std::{io::Cursor, result, time::Duration}; - use tempfile::TempDir; + use std::{result, time::Duration}; #[test] fn test_hash_age_kind_is_durable_nonce() { @@ -5742,70 +5677,6 @@ mod tests { assert!(bank.is_delta.load(Ordering::Relaxed)); } - #[test] - fn test_bank_serialize() { - solana_logger::setup(); - let (genesis_config, _) = create_genesis_config(500); - let bank0 = Arc::new(Bank::new(&genesis_config)); - let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1); - bank0.squash(); - - // Create an account on a non-root fork - let key1 = Keypair::new(); - bank1.deposit(&key1.pubkey(), 5); - - let bank2 = Bank::new_from_parent(&bank0, &Pubkey::default(), 2); - - // Test new account - let key2 = Keypair::new(); - bank2.deposit(&key2.pubkey(), 10); - assert_eq!(bank2.get_balance(&key2.pubkey()), 10); - - let key3 = Keypair::new(); - bank2.deposit(&key3.pubkey(), 0); - - bank2.squash(); - - let snapshot_storages = bank2.get_snapshot_storages(); - let rc_serialize = BankRcSerialize { - bank_rc: &bank2.rc, - snapshot_storages: &snapshot_storages, - }; - let len = serialized_size(&bank2).unwrap() + serialized_size(&rc_serialize).unwrap(); - let mut buf = vec![0u8; len as usize]; - let mut writer = Cursor::new(&mut buf[..]); - serialize_into(&mut writer, &bank2).unwrap(); - serialize_into(&mut writer, &rc_serialize).unwrap(); - - let mut rdr = Cursor::new(&buf[..]); - let mut dbank: Bank = deserialize_from_snapshot(&mut rdr).unwrap(); - let mut reader = BufReader::new(&buf[rdr.position() as usize..]); - - // Create a new set of directories for this bank's accounts - let (_accounts_dir, dbank_paths) = get_temp_accounts_paths(4).unwrap(); - let ref_sc = StatusCacheRc::default(); - ref_sc.status_cache.write().unwrap().add_root(2); - // Create a directory to simulate AppendVecs unpackaged from a snapshot tar - let copied_accounts = TempDir::new().unwrap(); - copy_append_vecs(&bank2.rc.accounts.accounts_db, copied_accounts.path()).unwrap(); - dbank.set_bank_rc( - BankRc::from_stream( - &dbank_paths, - dbank.slot(), - &dbank.ancestors, - &[], - &mut reader, - copied_accounts.path(), - ) - .unwrap(), - ref_sc, - ); - assert_eq!(dbank.get_balance(&key1.pubkey()), 0); - assert_eq!(dbank.get_balance(&key2.pubkey()), 10); - assert_eq!(dbank.get_balance(&key3.pubkey()), 0); - bank2.compare_bank(&dbank); - } - #[test] #[allow(clippy::float_cmp)] fn test_check_point_value() { diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index 55768daea..9c0122b96 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -14,6 +14,7 @@ pub mod message_processor; mod native_loader; pub mod nonce_utils; pub mod rent_collector; +pub mod serde_snapshot; pub mod stakes; pub mod status_cache; mod system_instruction_processor; diff --git a/runtime/src/serde_snapshot.rs b/runtime/src/serde_snapshot.rs new file mode 100644 index 000000000..6b344a984 --- /dev/null +++ b/runtime/src/serde_snapshot.rs @@ -0,0 +1,295 @@ +use { + crate::{ + accounts::Accounts, + accounts_db::{ + AccountStorageEntry, AccountStorageStatus, AccountsDB, AppendVecId, BankHashInfo, + }, + append_vec::AppendVec, + bank::BankRc, + }, + bincode::{deserialize_from, serialize_into}, + fs_extra::dir::CopyOptions, + log::{info, warn}, + rand::{thread_rng, Rng}, + serde::{ + de::{DeserializeOwned, Visitor}, + Deserialize, Deserializer, Serialize, Serializer, + }, + solana_sdk::clock::Slot, + std::{ + cmp::min, + collections::HashMap, + fmt::{Formatter, Result as FormatResult}, + io::{ + BufReader, BufWriter, Cursor, Error as IoError, ErrorKind as IoErrorKind, Read, Write, + }, + path::{Path, PathBuf}, + result::Result, + sync::{atomic::Ordering, Arc}, + }, +}; + +mod future; +mod legacy; +mod tests; +mod utils; + +use future::Context as TypeContextFuture; +use legacy::Context as TypeContextLegacy; +#[allow(unused_imports)] +use utils::{serialize_iter_as_map, serialize_iter_as_seq, serialize_iter_as_tuple}; + +// a number of test cases in accounts_db use this +#[cfg(test)] +pub(crate) use self::tests::reconstruct_accounts_db_via_serialization; + +pub use crate::accounts_db::{SnapshotStorage, SnapshotStorages}; + +#[derive(Copy, Clone, Eq, PartialEq)] +pub enum SerdeStyle { + NEWER, + OLDER, +} + +const MAX_ACCOUNTS_DB_STREAM_SIZE: u64 = 32 * 1024 * 1024 * 1024; + +#[derive(Clone, Debug, Default, Deserialize, Serialize)] +pub struct AccountDBFields(HashMap>, u64, Slot, BankHashInfo); + +pub trait TypeContext<'a> { + type SerializableAccountStorageEntry: Serialize + + DeserializeOwned + + From<&'a AccountStorageEntry> + + Into; + + fn serialize_bank_rc_fields( + serializer: S, + serializable_bank: &SerializableBankRc<'a, Self>, + ) -> std::result::Result + where + Self: std::marker::Sized; + + fn serialize_accounts_db_fields( + serializer: S, + serializable_db: &SerializableAccountsDB<'a, Self>, + ) -> std::result::Result + where + Self: std::marker::Sized; + + fn deserialize_accounts_db_fields( + stream: &mut BufReader, + ) -> Result, IoError> + where + R: Read; + + // we might define fn (de)serialize_bank(...) -> Result for versionized bank serialization in the future +} + +fn bankrc_to_io_error(error: T) -> IoError { + let msg = error.to_string(); + warn!("BankRc error: {:?}", msg); + IoError::new(IoErrorKind::Other, msg) +} + +fn accountsdb_to_io_error(error: T) -> IoError { + let msg = error.to_string(); + warn!("AccountsDB error: {:?}", msg); + IoError::new(IoErrorKind::Other, msg) +} + +pub fn bankrc_from_stream( + serde_style: SerdeStyle, + account_paths: &[PathBuf], + slot: Slot, + stream: &mut BufReader, + stream_append_vecs_path: P, +) -> std::result::Result +where + R: Read, + P: AsRef, +{ + macro_rules! INTO { + ($x:ident) => { + Ok(BankRc::new( + Accounts::new_empty(context_accountsdb_from_fields::<$x, P>( + $x::deserialize_accounts_db_fields(stream)?, + account_paths, + stream_append_vecs_path, + )?), + slot, + )) + }; + } + match serde_style { + SerdeStyle::NEWER => INTO!(TypeContextFuture), + SerdeStyle::OLDER => INTO!(TypeContextLegacy), + } +} + +pub fn bankrc_to_stream( + serde_style: SerdeStyle, + stream: &mut BufWriter, + bank_rc: &BankRc, + snapshot_storages: &[SnapshotStorage], +) -> Result<(), IoError> +where + W: Write, +{ + macro_rules! INTO { + ($x:ident) => { + serialize_into( + stream, + &SerializableBankRc::<$x> { + bank_rc, + snapshot_storages, + phantom: std::marker::PhantomData::default(), + }, + ) + .map_err(bankrc_to_io_error) + }; + } + match serde_style { + SerdeStyle::NEWER => INTO!(TypeContextFuture), + SerdeStyle::OLDER => INTO!(TypeContextLegacy), + } +} + +pub struct SerializableBankRc<'a, C> { + bank_rc: &'a BankRc, + snapshot_storages: &'a [SnapshotStorage], + phantom: std::marker::PhantomData, +} + +impl<'a, C: TypeContext<'a>> Serialize for SerializableBankRc<'a, C> { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::ser::Serializer, + { + C::serialize_bank_rc_fields(serializer, self) + } +} + +pub struct SerializableAccountsDB<'a, C> { + accounts_db: &'a AccountsDB, + slot: Slot, + account_storage_entries: &'a [SnapshotStorage], + phantom: std::marker::PhantomData, +} + +impl<'a, C: TypeContext<'a>> Serialize for SerializableAccountsDB<'a, C> { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::ser::Serializer, + { + C::serialize_accounts_db_fields(serializer, self) + } +} + +fn context_accountsdb_from_fields<'a, C, P>( + account_db_fields: AccountDBFields, + account_paths: &[PathBuf], + stream_append_vecs_path: P, +) -> Result +where + C: TypeContext<'a>, + P: AsRef, +{ + let accounts_db = AccountsDB::new(account_paths.to_vec()); + + let AccountDBFields(storage, version, slot, bank_hash_info) = account_db_fields; + + // convert to two level map of slot -> id -> account storage entry + let storage = { + let mut map = HashMap::new(); + for (slot, entries) in storage.into_iter() { + let sub_map = map.entry(slot).or_insert_with(HashMap::new); + for entry in entries.into_iter() { + let mut entry: AccountStorageEntry = entry.into(); + entry.slot = slot; + sub_map.insert(entry.id, Arc::new(entry)); + } + } + map + }; + + // Remap the deserialized AppendVec paths to point to correct local paths + let mut storage = storage + .into_iter() + .map(|(slot, mut slot_storage)| { + let mut new_slot_storage = HashMap::new(); + for (id, storage_entry) in slot_storage.drain() { + let path_index = thread_rng().gen_range(0, accounts_db.paths.len()); + let local_dir = &accounts_db.paths[path_index]; + + std::fs::create_dir_all(local_dir).expect("Create directory failed"); + + // Move the corresponding AppendVec from the snapshot into the directory pointed + // at by `local_dir` + let append_vec_relative_path = AppendVec::new_relative_path(slot, storage_entry.id); + let append_vec_abs_path = stream_append_vecs_path + .as_ref() + .join(&append_vec_relative_path); + let target = local_dir.join(append_vec_abs_path.file_name().unwrap()); + if std::fs::rename(append_vec_abs_path.clone(), target).is_err() { + let mut copy_options = CopyOptions::new(); + copy_options.overwrite = true; + let e = fs_extra::move_items( + &vec![&append_vec_abs_path], + &local_dir, + ©_options, + ) + .map_err(|e| { + format!( + "unable to move {:?} to {:?}: {}", + append_vec_abs_path, local_dir, e + ) + }) + .map_err(accountsdb_to_io_error); + if e.is_err() { + info!("{:?}", e); + continue; + } + }; + + // Notify the AppendVec of the new file location + let local_path = local_dir.join(append_vec_relative_path); + let mut u_storage_entry = Arc::try_unwrap(storage_entry).unwrap(); + u_storage_entry + .set_file(local_path) + .map_err(accountsdb_to_io_error)?; + new_slot_storage.insert(id, Arc::new(u_storage_entry)); + } + Ok((slot, new_slot_storage)) + }) + .collect::, IoError>>()?; + + // discard any slots with no storage entries + // this can happen if a non-root slot was serialized + // but non-root stores should not be included in the snapshot + storage.retain(|_slot, stores| !stores.is_empty()); + + accounts_db + .bank_hashes + .write() + .unwrap() + .insert(slot, bank_hash_info); + + // Process deserialized data, set necessary fields in self + let max_id: usize = *storage + .values() + .flat_map(HashMap::keys) + .max() + .expect("At least one storage entry must exist from deserializing stream"); + + { + let mut stores = accounts_db.storage.write().unwrap(); + stores.0.extend(storage); + } + + accounts_db.next_id.store(max_id + 1, Ordering::Relaxed); + accounts_db + .write_version + .fetch_add(version, Ordering::Relaxed); + accounts_db.generate_index(); + Ok(accounts_db) +} diff --git a/runtime/src/serde_snapshot/future.rs b/runtime/src/serde_snapshot/future.rs new file mode 100644 index 000000000..24458b22c --- /dev/null +++ b/runtime/src/serde_snapshot/future.rs @@ -0,0 +1,101 @@ +use {super::*, solana_measure::measure::Measure, std::cell::RefCell}; + +// Serializable version of AccountStorageEntry for snapshot format +#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)] +pub(super) struct SerializableAccountStorageEntry { + id: AppendVecId, + accounts_current_len: usize, +} + +impl From<&AccountStorageEntry> for SerializableAccountStorageEntry { + fn from(rhs: &AccountStorageEntry) -> Self { + Self { + id: rhs.id, + accounts_current_len: rhs.accounts.len(), + } + } +} + +impl Into for SerializableAccountStorageEntry { + fn into(self) -> AccountStorageEntry { + AccountStorageEntry::new_empty_map(self.id, self.accounts_current_len) + } +} + +pub(super) struct Context {} +impl<'a> TypeContext<'a> for Context { + type SerializableAccountStorageEntry = SerializableAccountStorageEntry; + + fn serialize_bank_rc_fields( + serializer: S, + serializable_bank: &SerializableBankRc<'a, Self>, + ) -> std::result::Result + where + Self: std::marker::Sized, + { + let accounts_db_serialize = SerializableAccountsDB::<'a, Self> { + accounts_db: &*serializable_bank.bank_rc.accounts.accounts_db, + slot: serializable_bank.bank_rc.slot, + account_storage_entries: serializable_bank.snapshot_storages, + phantom: std::marker::PhantomData::default(), + }; + + accounts_db_serialize.serialize(serializer) + } + + fn serialize_accounts_db_fields( + serializer: S, + serializable_db: &SerializableAccountsDB<'a, Self>, + ) -> std::result::Result + where + Self: std::marker::Sized, + { + // sample write version before serializing storage entries + let version = serializable_db + .accounts_db + .write_version + .load(Ordering::Relaxed); + + // (1st of 3 elements) write the list of account storage entry lists out as a map + let entry_count = RefCell::::new(0); + let entries = + serialize_iter_as_map(serializable_db.account_storage_entries.iter().map(|x| { + *entry_count.borrow_mut() += x.len(); + ( + x.first().unwrap().slot, + serialize_iter_as_seq( + x.iter() + .map(|x| Self::SerializableAccountStorageEntry::from(x.as_ref())), + ), + ) + })); + let slot = serializable_db.slot; + let hash = serializable_db + .accounts_db + .bank_hashes + .read() + .unwrap() + .get(&serializable_db.slot) + .unwrap_or_else(|| panic!("No bank_hashes entry for slot {}", serializable_db.slot)) + .clone(); + + let mut serialize_account_storage_timer = Measure::start("serialize_account_storage_ms"); + let result = (entries, version, slot, hash).serialize(serializer); + serialize_account_storage_timer.stop(); + datapoint_info!( + "serialize_account_storage_ms", + ("duration", serialize_account_storage_timer.as_ms(), i64), + ("num_entries", *entry_count.borrow(), i64), + ); + result + } + + fn deserialize_accounts_db_fields( + mut stream: &mut BufReader, + ) -> Result, IoError> + where + R: Read, + { + deserialize_from(&mut stream).map_err(accountsdb_to_io_error) + } +} diff --git a/runtime/src/serde_snapshot/legacy.rs b/runtime/src/serde_snapshot/legacy.rs new file mode 100644 index 000000000..8277a078c --- /dev/null +++ b/runtime/src/serde_snapshot/legacy.rs @@ -0,0 +1,209 @@ +use {super::*, solana_measure::measure::Measure, std::cell::RefCell}; + +// Serializable version of AccountStorageEntry for snapshot format +#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)] +pub(super) struct SerializableAccountStorageEntry { + id: AppendVecId, + accounts: SerializableAppendVec, + count_and_status: (usize, AccountStorageStatus), +} + +impl From<&AccountStorageEntry> for SerializableAccountStorageEntry { + fn from(rhs: &AccountStorageEntry) -> Self { + Self { + id: rhs.id, + accounts: SerializableAppendVec::from(&rhs.accounts), + ..Self::default() + } + } +} + +impl Into for SerializableAccountStorageEntry { + fn into(self) -> AccountStorageEntry { + AccountStorageEntry::new_empty_map(self.id, self.accounts.current_len) + } +} + +// Serializable version of AppendVec for snapshot format +#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] +struct SerializableAppendVec { + current_len: usize, +} + +impl From<&AppendVec> for SerializableAppendVec { + fn from(rhs: &AppendVec) -> SerializableAppendVec { + SerializableAppendVec { + current_len: rhs.len(), + } + } +} + +impl Into for SerializableAppendVec { + fn into(self) -> AppendVec { + AppendVec::new_empty_map(self.current_len) + } +} + +// Serialization of AppendVec requires serialization of u64 to +// eight byte vector which is then itself serialized to the stream +impl Serialize for SerializableAppendVec { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + const LEN: usize = std::mem::size_of::(); + let mut buf = [0u8; LEN]; + serialize_into(Cursor::new(&mut buf[..]), &(self.current_len as u64)) + .map_err(serde::ser::Error::custom)?; + serializer.serialize_bytes(&buf) + } +} + +// Deserialization of AppendVec requires deserialization +// of eight byte vector from which u64 is then deserialized +impl<'de> Deserialize<'de> for SerializableAppendVec { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + use serde::de::Error; + struct SerializableAppendVecVisitor; + impl<'a> Visitor<'a> for SerializableAppendVecVisitor { + type Value = SerializableAppendVec; + fn expecting(&self, formatter: &mut Formatter) -> FormatResult { + formatter.write_str("Expecting SerializableAppendVec") + } + fn visit_bytes(self, data: &[u8]) -> std::result::Result + where + E: Error, + { + const LEN: u64 = std::mem::size_of::() as u64; + let mut rd = Cursor::new(&data[..]); + let current_len: usize = deserialize_from(&mut rd).map_err(Error::custom)?; + if rd.position() != LEN { + Err(Error::custom("SerializableAppendVec: unexpected length")) + } else { + Ok(SerializableAppendVec { current_len }) + } + } + } + deserializer.deserialize_bytes(SerializableAppendVecVisitor) + } +} + +pub(super) struct Context {} +impl<'a> TypeContext<'a> for Context { + type SerializableAccountStorageEntry = SerializableAccountStorageEntry; + + fn serialize_bank_rc_fields( + serializer: S, + serializable_bank: &SerializableBankRc<'a, Self>, + ) -> std::result::Result + where + Self: std::marker::Sized, + { + // as there is no deserialize_bank_rc_fields(), do not emit the u64 + // size field here and have serialize_accounts_db_fields() emit two + // u64 size fields instead + SerializableAccountsDB::<'a, Self> { + accounts_db: &*serializable_bank.bank_rc.accounts.accounts_db, + slot: serializable_bank.bank_rc.slot, + account_storage_entries: serializable_bank.snapshot_storages, + phantom: std::marker::PhantomData::default(), + } + .serialize(serializer) + } + + fn serialize_accounts_db_fields( + serializer: S, + serializable_db: &SerializableAccountsDB<'a, Self>, + ) -> std::result::Result + where + Self: std::marker::Sized, + { + // sample write version before serializing storage entries + let version = serializable_db + .accounts_db + .write_version + .load(Ordering::Relaxed); + + // (1st of 3 elements) write the list of account storage entry lists out as a map + let entry_count = RefCell::::new(0); + let entries = + serialize_iter_as_map(serializable_db.account_storage_entries.iter().map(|x| { + *entry_count.borrow_mut() += x.len(); + ( + x.first().unwrap().slot, + serialize_iter_as_seq( + x.iter() + .map(|x| Self::SerializableAccountStorageEntry::from(x.as_ref())), + ), + ) + })); + + let slot_hash = ( + serializable_db.slot, + serializable_db + .accounts_db + .bank_hashes + .read() + .unwrap() + .get(&serializable_db.slot) + .unwrap_or_else(|| panic!("No bank_hashes entry for slot {}", serializable_db.slot)) + .clone(), + ); + + let mut serialize_account_storage_timer = Measure::start("serialize_account_storage_ms"); + let result = ( + &MAX_ACCOUNTS_DB_STREAM_SIZE, + &MAX_ACCOUNTS_DB_STREAM_SIZE, + &entries, + &version, + &slot_hash, + ) + .serialize(serializer); + serialize_account_storage_timer.stop(); + datapoint_info!( + "serialize_account_storage_ms", + ("duration", serialize_account_storage_timer.as_ms(), i64), + ("num_entries", *entry_count.borrow(), i64), + ); + result + } + + fn deserialize_accounts_db_fields( + mut stream: &mut BufReader, + ) -> Result, IoError> + where + R: Read, + { + // read and discard two u64 byte vector lengths + let serialized_len = MAX_ACCOUNTS_DB_STREAM_SIZE; + let serialized_len = min( + serialized_len, + deserialize_from(&mut stream).map_err(accountsdb_to_io_error)?, + ); + let serialized_len = min( + serialized_len, + deserialize_from(&mut stream).map_err(accountsdb_to_io_error)?, + ); + + // (1st of 3 elements) read in map of slots to account storage entries + let storage: HashMap> = bincode::config() + .limit(serialized_len) + .deserialize_from(&mut stream) + .map_err(accountsdb_to_io_error)?; + + // (2nd of 3 elements) read in write version + let version: u64 = deserialize_from(&mut stream) + .map_err(|e| format!("write version deserialize error: {}", e.to_string())) + .map_err(accountsdb_to_io_error)?; + + // (3rd of 3 elements) read in (slot, bank hashes) pair + let (slot, bank_hash_info): (Slot, BankHashInfo) = deserialize_from(&mut stream) + .map_err(|e| format!("bank hashes deserialize error: {}", e.to_string())) + .map_err(accountsdb_to_io_error)?; + + Ok(AccountDBFields(storage, version, slot, bank_hash_info)) + } +} diff --git a/runtime/src/serde_snapshot/tests.rs b/runtime/src/serde_snapshot/tests.rs new file mode 100644 index 000000000..21aebf8ac --- /dev/null +++ b/runtime/src/serde_snapshot/tests.rs @@ -0,0 +1,282 @@ +#[cfg(test)] +use { + super::*, + crate::{ + accounts::{create_test_accounts, Accounts}, + accounts_db::get_temp_accounts_paths, + bank::{Bank, StatusCacheRc}, + }, + rand::{thread_rng, Rng}, + solana_sdk::{ + account::Account, + clock::Slot, + genesis_config::create_genesis_config, + pubkey::Pubkey, + signature::{Keypair, Signer}, + }, + std::io::{BufReader, Cursor}, + tempfile::TempDir, +}; + +#[cfg(test)] +fn copy_append_vecs>( + accounts_db: &AccountsDB, + output_dir: P, +) -> std::io::Result<()> { + let storage_entries = accounts_db.get_snapshot_storages(Slot::max_value()); + for storage in storage_entries.iter().flatten() { + let storage_path = storage.get_path(); + let output_path = output_dir.as_ref().join( + storage_path + .file_name() + .expect("Invalid AppendVec file path"), + ); + + std::fs::copy(storage_path, output_path)?; + } + + Ok(()) +} + +#[cfg(test)] +fn check_accounts(accounts: &Accounts, pubkeys: &[Pubkey], num: usize) { + for _ in 1..num { + let idx = thread_rng().gen_range(0, num - 1); + let ancestors = vec![(0, 0)].into_iter().collect(); + let account = accounts.load_slow(&ancestors, &pubkeys[idx]); + let account1 = Some(( + Account::new((idx + 1) as u64, 0, &Account::default().owner), + 0, + )); + assert_eq!(account, account1); + } +} + +#[cfg(test)] +fn context_accountsdb_from_stream<'a, C, R, P>( + stream: &mut BufReader, + account_paths: &[PathBuf], + stream_append_vecs_path: P, +) -> Result +where + C: TypeContext<'a>, + R: Read, + P: AsRef, +{ + // read and deserialise the accounts database directly from the stream + context_accountsdb_from_fields::( + C::deserialize_accounts_db_fields(stream)?, + account_paths, + stream_append_vecs_path, + ) +} + +#[cfg(test)] +fn accountsdb_from_stream( + serde_style: SerdeStyle, + stream: &mut BufReader, + account_paths: &[PathBuf], + stream_append_vecs_path: P, +) -> Result +where + R: Read, + P: AsRef, +{ + match serde_style { + SerdeStyle::NEWER => context_accountsdb_from_stream::( + stream, + account_paths, + stream_append_vecs_path, + ), + SerdeStyle::OLDER => context_accountsdb_from_stream::( + stream, + account_paths, + stream_append_vecs_path, + ), + } +} + +#[cfg(test)] +fn accountsdb_to_stream( + serde_style: SerdeStyle, + stream: &mut W, + accounts_db: &AccountsDB, + slot: Slot, + account_storage_entries: &[SnapshotStorage], +) -> Result<(), IoError> +where + W: Write, +{ + match serde_style { + SerdeStyle::NEWER => serialize_into( + stream, + &SerializableAccountsDB:: { + accounts_db, + slot, + account_storage_entries, + phantom: std::marker::PhantomData::default(), + }, + ) + .map_err(bankrc_to_io_error), + SerdeStyle::OLDER => serialize_into( + stream, + &SerializableAccountsDB:: { + accounts_db, + slot, + account_storage_entries, + phantom: std::marker::PhantomData::default(), + }, + ) + .map_err(bankrc_to_io_error), + } +} + +#[cfg(test)] +fn test_accounts_serialize_style(serde_style: SerdeStyle) { + solana_logger::setup(); + let (_accounts_dir, paths) = get_temp_accounts_paths(4).unwrap(); + let accounts = Accounts::new(paths); + + let mut pubkeys: Vec = vec![]; + create_test_accounts(&accounts, &mut pubkeys, 100, 0); + check_accounts(&accounts, &pubkeys, 100); + accounts.add_root(0); + + let mut writer = Cursor::new(vec![]); + accountsdb_to_stream( + serde_style, + &mut writer, + &*accounts.accounts_db, + 0, + &accounts.accounts_db.get_snapshot_storages(0), + ) + .unwrap(); + + let copied_accounts = TempDir::new().unwrap(); + + // Simulate obtaining a copy of the AppendVecs from a tarball + copy_append_vecs(&accounts.accounts_db, copied_accounts.path()).unwrap(); + + let buf = writer.into_inner(); + let mut reader = BufReader::new(&buf[..]); + let (_accounts_dir, daccounts_paths) = get_temp_accounts_paths(2).unwrap(); + let daccounts = Accounts::new_empty( + accountsdb_from_stream( + serde_style, + &mut reader, + &daccounts_paths, + copied_accounts.path(), + ) + .unwrap(), + ); + check_accounts(&daccounts, &pubkeys, 100); + assert_eq!(accounts.bank_hash_at(0), daccounts.bank_hash_at(0)); +} + +#[cfg(test)] +fn test_bank_serialize_style(serde_style: SerdeStyle) { + solana_logger::setup(); + let (genesis_config, _) = create_genesis_config(500); + let bank0 = Arc::new(Bank::new(&genesis_config)); + let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1); + bank0.squash(); + + // Create an account on a non-root fork + let key1 = Keypair::new(); + bank1.deposit(&key1.pubkey(), 5); + + let bank2 = Bank::new_from_parent(&bank0, &Pubkey::default(), 2); + + // Test new account + let key2 = Keypair::new(); + bank2.deposit(&key2.pubkey(), 10); + assert_eq!(bank2.get_balance(&key2.pubkey()), 10); + + let key3 = Keypair::new(); + bank2.deposit(&key3.pubkey(), 0); + + bank2.squash(); + + let snapshot_storages = bank2.get_snapshot_storages(); + let mut buf = vec![]; + let mut writer = Cursor::new(&mut buf); + serialize_into(&mut writer, &bank2).unwrap(); + crate::serde_snapshot::bankrc_to_stream( + serde_style, + &mut std::io::BufWriter::new(&mut writer), + &bank2.rc, + &snapshot_storages, + ) + .unwrap(); + + let mut rdr = Cursor::new(&buf[..]); + let mut dbank: Bank = bincode::deserialize_from(&mut rdr).unwrap(); + let mut reader = std::io::BufReader::new(&buf[rdr.position() as usize..]); + + // Create a new set of directories for this bank's accounts + let (_accounts_dir, dbank_paths) = get_temp_accounts_paths(4).unwrap(); + let ref_sc = StatusCacheRc::default(); + ref_sc.status_cache.write().unwrap().add_root(2); + // Create a directory to simulate AppendVecs unpackaged from a snapshot tar + let copied_accounts = TempDir::new().unwrap(); + copy_append_vecs(&bank2.rc.accounts.accounts_db, copied_accounts.path()).unwrap(); + dbank.set_bank_rc( + crate::serde_snapshot::bankrc_from_stream( + serde_style, + &dbank_paths, + dbank.slot(), + &mut reader, + copied_accounts.path(), + ) + .unwrap(), + ref_sc, + ); + assert_eq!(dbank.get_balance(&key1.pubkey()), 0); + assert_eq!(dbank.get_balance(&key2.pubkey()), 10); + assert_eq!(dbank.get_balance(&key3.pubkey()), 0); + bank2.compare_bank(&dbank); +} + +#[cfg(test)] +pub(crate) fn reconstruct_accounts_db_via_serialization( + accounts: &AccountsDB, + slot: Slot, +) -> AccountsDB { + let mut writer = Cursor::new(vec![]); + let snapshot_storages = accounts.get_snapshot_storages(slot); + accountsdb_to_stream( + SerdeStyle::NEWER, + &mut writer, + &accounts, + slot, + &snapshot_storages, + ) + .unwrap(); + + let buf = writer.into_inner(); + let mut reader = BufReader::new(&buf[..]); + let copied_accounts = TempDir::new().unwrap(); + // Simulate obtaining a copy of the AppendVecs from a tarball + copy_append_vecs(&accounts, copied_accounts.path()).unwrap(); + accountsdb_from_stream(SerdeStyle::NEWER, &mut reader, &[], copied_accounts.path()).unwrap() +} + +#[test] +fn test_accounts_serialize_newer() { + test_accounts_serialize_style(SerdeStyle::NEWER) +} + +#[test] +fn test_accounts_serialize_older() { + test_accounts_serialize_style(SerdeStyle::OLDER) +} + +#[test] +fn test_bank_serialize_newer() { + test_bank_serialize_style(SerdeStyle::NEWER) +} + +#[test] +fn test_bank_serialize_older() { + test_bank_serialize_style(SerdeStyle::OLDER) +} diff --git a/runtime/src/serde_snapshot/utils.rs b/runtime/src/serde_snapshot/utils.rs new file mode 100644 index 000000000..043cc5147 --- /dev/null +++ b/runtime/src/serde_snapshot/utils.rs @@ -0,0 +1,107 @@ +use serde::{ + ser::{SerializeSeq, SerializeTuple}, + Serialize, Serializer, +}; + +// consumes an iterator and returns an object that will serialize as a serde seq +#[allow(dead_code)] +pub fn serialize_iter_as_seq(iter: I) -> impl Serialize +where + I: IntoIterator, + ::Item: Serialize, + ::IntoIter: ExactSizeIterator, +{ + struct SerializableSequencedIterator { + iter: std::cell::RefCell>, + } + + impl Serialize for SerializableSequencedIterator + where + I: IntoIterator, + ::Item: Serialize, + ::IntoIter: ExactSizeIterator, + { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let iter = self.iter.borrow_mut().take().unwrap().into_iter(); + let mut seq = serializer.serialize_seq(Some(iter.len()))?; + for item in iter { + seq.serialize_element(&item)?; + } + seq.end() + } + } + + SerializableSequencedIterator { + iter: std::cell::RefCell::new(Some(iter)), + } +} + +// consumes an iterator and returns an object that will serialize as a serde tuple +#[allow(dead_code)] +pub fn serialize_iter_as_tuple(iter: I) -> impl Serialize +where + I: IntoIterator, + ::Item: Serialize, + ::IntoIter: ExactSizeIterator, +{ + struct SerializableSequencedIterator { + iter: std::cell::RefCell>, + } + + impl Serialize for SerializableSequencedIterator + where + I: IntoIterator, + ::Item: Serialize, + ::IntoIter: ExactSizeIterator, + { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let iter = self.iter.borrow_mut().take().unwrap().into_iter(); + let mut tup = serializer.serialize_tuple(iter.len())?; + for item in iter { + tup.serialize_element(&item)?; + } + tup.end() + } + } + + SerializableSequencedIterator { + iter: std::cell::RefCell::new(Some(iter)), + } +} + +// consumes a 2-tuple iterator and returns an object that will serialize as a serde map +#[allow(dead_code)] +pub fn serialize_iter_as_map(iter: I) -> impl Serialize +where + K: Serialize, + V: Serialize, + I: IntoIterator, +{ + struct SerializableMappedIterator { + iter: std::cell::RefCell>, + } + + impl Serialize for SerializableMappedIterator + where + K: Serialize, + V: Serialize, + I: IntoIterator, + { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + serializer.collect_map(self.iter.borrow_mut().take().unwrap()) + } + } + + SerializableMappedIterator { + iter: std::cell::RefCell::new(Some(iter)), + } +}