From 1dbb5c8647d8cc7308336f0a90568d98ffa10d63 Mon Sep 17 00:00:00 2001 From: carllin Date: Mon, 5 Aug 2019 22:53:19 -0700 Subject: [PATCH] Deserialize snapshots (#5417) * Deserialize snapshots --- Cargo.lock | 2 + core/src/bank_forks.rs | 135 ++++++++++---------- core/src/replicator.rs | 2 +- core/src/snapshot_package.rs | 21 ++-- core/src/snapshot_utils.rs | 99 ++++++++------- core/src/storage_stage.rs | 2 +- core/src/validator.rs | 44 ++++--- runtime/Cargo.toml | 2 + runtime/src/accounts.rs | 30 +++-- runtime/src/accounts_db.rs | 227 ++++++++++++++++++++++------------ runtime/src/accounts_index.rs | 2 - runtime/src/append_vec.rs | 69 ++++++++--- runtime/src/bank.rs | 61 +++++---- runtime/src/lib.rs | 3 + 14 files changed, 433 insertions(+), 266 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2802a972e0..374ba91947 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3603,6 +3603,7 @@ dependencies = [ "bv 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", "byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", + "fs_extra 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "hashbrown 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.60 (registry+https://github.com/rust-lang/crates.io-index)", @@ -3627,6 +3628,7 @@ dependencies = [ "solana-vote-api 0.18.0-pre1", "solana-vote-program 0.18.0-pre1", "sys-info 0.5.7 (registry+https://github.com/rust-lang/crates.io-index)", + "tempfile 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] diff --git a/core/src/bank_forks.rs b/core/src/bank_forks.rs index bed44de322..452a170df4 100644 --- a/core/src/bank_forks.rs +++ b/core/src/bank_forks.rs @@ -2,12 +2,14 @@ use crate::result::{Error, Result}; use crate::snapshot_package::SnapshotPackageSender; +use crate::snapshot_package::{TAR_ACCOUNTS_DIR, TAR_SNAPSHOTS_DIR}; use crate::snapshot_utils; +use crate::snapshot_utils::untar_snapshot_in; +use fs_extra::dir::CopyOptions; use solana_measure::measure::Measure; use solana_metrics::inc_new_counter_info; -use solana_runtime::bank::{Bank, BankRc, StatusCacheRc}; +use solana_runtime::bank::Bank; use solana_runtime::status_cache::MAX_CACHE_ENTRIES; -use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::timing; use std::collections::{HashMap, HashSet}; use std::fs; @@ -275,7 +277,7 @@ impl BankForks { .get(root) .cloned() .expect("root must exist in BankForks"); - snapshot_utils::add_snapshot(&config.snapshot_path, &bank, root)?; + snapshot_utils::add_snapshot(&config.snapshot_path, &bank)?; // Package the relevant snapshots let names = snapshot_utils::get_snapshot_names(&config.snapshot_path); @@ -348,50 +350,35 @@ impl BankForks { &self.snapshot_config } - fn setup_banks( - bank_maps: &mut Vec<(u64, u64, Bank)>, - bank_rc: &BankRc, - status_cache_rc: &StatusCacheRc, - ) -> (HashMap>, u64) { - let mut banks = HashMap::new(); - let (last_slot, last_parent_slot, mut last_bank) = bank_maps.remove(0); - last_bank.set_bank_rc(&bank_rc, &status_cache_rc); - - while let Some((slot, parent_slot, mut bank)) = bank_maps.pop() { - bank.set_bank_rc(&bank_rc, &status_cache_rc); - if parent_slot != 0 { - if let Some(parent) = banks.get(&parent_slot) { - bank.set_parent(parent); - } - } - if slot > 0 { - banks.insert(slot, Arc::new(bank)); - } - } - if last_parent_slot != 0 { - if let Some(parent) = banks.get(&last_parent_slot) { - last_bank.set_parent(parent); - } - } - banks.insert(last_slot, Arc::new(last_bank)); - - (banks, last_slot) - } - - pub fn load_from_snapshot( - genesis_block: &GenesisBlock, - account_paths: Option, + pub fn load_from_snapshot>( + account_paths: String, snapshot_config: &SnapshotConfig, + snapshot_tar: P, ) -> Result { fs::create_dir_all(&snapshot_config.snapshot_path)?; - let names = snapshot_utils::get_snapshot_names(&snapshot_config.snapshot_path); - if names.is_empty() { - return Err(Error::IO(IOError::new( - ErrorKind::Other, - "no snapshots found", - ))); - } - let mut bank_maps = vec![]; + // Untar the snapshot into a temp directory under `snapshot_config.snapshot_path()` + let unpack_dir = tempfile::tempdir_in(snapshot_config.snapshot_path())?; + untar_snapshot_in(&snapshot_tar, &unpack_dir)?; + + let unpacked_accounts_dir = unpack_dir.as_ref().join(TAR_ACCOUNTS_DIR); + let unpacked_snapshots_dir = unpack_dir.as_ref().join(TAR_SNAPSHOTS_DIR); + let bank = snapshot_utils::bank_from_snapshots( + account_paths, + &unpacked_snapshots_dir, + unpacked_accounts_dir, + )?; + + let bank = Arc::new(bank); + // Move the unpacked snapshots into `snapshot_config.snapshot_path()` + let dir_files = fs::read_dir(unpacked_snapshots_dir).expect("Invalid snapshot path"); + let paths: Vec = dir_files + .filter_map(|entry| entry.ok().map(|e| e.path())) + .collect(); + let mut copy_options = CopyOptions::new(); + copy_options.overwrite = true; + fs_extra::move_items(&paths, snapshot_config.snapshot_path(), ©_options)?; + + /*let mut bank_maps = vec![]; let status_cache_rc = StatusCacheRc::default(); let id = (names[names.len() - 1] + 1) as usize; let mut bank0 = @@ -411,17 +398,27 @@ impl BankForks { ))); } - let root = bank_root.unwrap(); - let (banks, last_slot) = - BankForks::setup_banks(&mut bank_maps, &bank0.rc, &status_cache_rc); - let working_bank = banks[&last_slot].clone(); + let (banks, last_slot) = BankForks::setup_banks(&mut bank_maps, &bank.rc, &status_cache_rc); + let working_bank = banks[&last_slot].clone();*/ + let mut banks = HashMap::new(); + banks.insert(bank.slot(), bank.clone()); + let root = bank.slot(); + let names = snapshot_utils::get_snapshot_names(&snapshot_config.snapshot_path); + if names.is_empty() { + return Err(Error::IO(IOError::new( + ErrorKind::Other, + "no snapshots found", + ))); + } Ok(BankForks { banks, - working_bank, + working_bank: bank, root, snapshot_config: None, - last_snapshot: *names.last().unwrap(), + last_snapshot: *names + .last() + .expect("untarred snapshot should have at least one snapshot"), confidence: HashMap::new(), }) } @@ -551,22 +548,17 @@ mod tests { ); } - fn restore_from_snapshot( - genesis_block: &GenesisBlock, - bank_forks: BankForks, - account_paths: Option, - last_slot: u64, - ) { - let snapshot_path = bank_forks + fn restore_from_snapshot(bank_forks: BankForks, account_paths: String, last_slot: u64) { + let (snapshot_path, snapshot_package_output_path) = bank_forks .snapshot_config .as_ref() - .map(|c| &c.snapshot_path) + .map(|c| (&c.snapshot_path, &c.snapshot_package_output_path)) .unwrap(); let new = BankForks::load_from_snapshot( - &genesis_block, account_paths, bank_forks.snapshot_config.as_ref().unwrap(), + snapshot_utils::get_snapshot_tar_path(snapshot_package_output_path), ) .unwrap(); @@ -595,7 +587,7 @@ mod tests { mint_keypair, .. } = create_genesis_block(10_000); - for index in 0..10 { + for index in 0..4 { let bank0 = Bank::new_with_paths( &genesis_block, Some(accounts_dir.path().to_str().unwrap().to_string()), @@ -609,7 +601,7 @@ mod tests { ); bank_forks.set_snapshot_config(snapshot_config.clone()); let bank0 = bank_forks.get(0).unwrap(); - snapshot_utils::add_snapshot(&snapshot_config.snapshot_path, bank0, 0).unwrap(); + snapshot_utils::add_snapshot(&snapshot_config.snapshot_path, bank0).unwrap(); for forks in 0..index { let bank = Bank::new_from_parent(&bank_forks[forks], &Pubkey::default(), forks + 1); let key1 = Keypair::new().pubkey(); @@ -621,13 +613,26 @@ mod tests { ); assert_eq!(bank.process_transaction(&tx), Ok(())); bank.freeze(); - snapshot_utils::add_snapshot(&snapshot_config.snapshot_path, &bank, 0).unwrap(); + snapshot_utils::add_snapshot(&snapshot_config.snapshot_path, &bank).unwrap(); bank_forks.insert(bank); } + // Generate a snapshot package for last bank + let last_bank = bank_forks.get(index.saturating_sub(1)).unwrap(); + let names: Vec<_> = (0..=index).collect(); + let snapshot_package = snapshot_utils::package_snapshot( + last_bank, + &names, + &snapshot_config.snapshot_path, + snapshot_utils::get_snapshot_tar_path( + &snapshot_config.snapshot_package_output_path, + ), + ) + .unwrap(); + SnapshotPackagerService::package_snapshots(&snapshot_package).unwrap(); + restore_from_snapshot( - &genesis_block, bank_forks, - Some(accounts_dir.path().to_str().unwrap().to_string()), + accounts_dir.path().to_str().unwrap().to_string(), index, ); } @@ -663,7 +668,7 @@ mod tests { // Take snapshot of zeroth bank let bank0 = bank_forks.get(0).unwrap(); - snapshot_utils::add_snapshot(&snapshot_config.snapshot_path, bank0, 0).unwrap(); + snapshot_utils::add_snapshot(&snapshot_config.snapshot_path, bank0).unwrap(); // Create next MAX_CACHE_ENTRIES + 2 banks and snapshots. Every bank will get snapshotted // and the snapshot purging logic will run on every snapshot taken. This means the three diff --git a/core/src/replicator.rs b/core/src/replicator.rs index 1e7288f4f3..3f76841d11 100644 --- a/core/src/replicator.rs +++ b/core/src/replicator.rs @@ -582,7 +582,7 @@ impl Replicator { return Err(Error::IO(::new( io::ErrorKind::Other, "unable to get recent blockhash, can't submit proof", - ))) + ))); } }; diff --git a/core/src/snapshot_package.rs b/core/src/snapshot_package.rs index 9c2eac57c0..1e30794fe7 100644 --- a/core/src/snapshot_package.rs +++ b/core/src/snapshot_package.rs @@ -15,7 +15,7 @@ use std::time::Duration; pub type SnapshotPackageSender = Sender; pub type SnapshotPackageReceiver = Receiver; -pub const TAR_SNAPSHOT_DIR: &str = "snapshots"; +pub const TAR_SNAPSHOTS_DIR: &str = "snapshots"; pub const TAR_ACCOUNTS_DIR: &str = "accounts"; pub struct SnapshotPackage { @@ -57,7 +57,7 @@ impl SnapshotPackagerService { if exit.load(Ordering::Relaxed) { break; } - if let Err(e) = Self::package_snapshots(&snapshot_package_receiver) { + if let Err(e) = Self::run(&snapshot_package_receiver) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), @@ -71,9 +71,7 @@ impl SnapshotPackagerService { } } - pub fn package_snapshots(snapshot_receiver: &SnapshotPackageReceiver) -> Result<()> { - let snapshot_package = snapshot_receiver.recv_timeout(Duration::from_secs(1))?; - + pub fn package_snapshots(snapshot_package: &SnapshotPackage) -> Result<()> { // Create the tar builder let tar_gz = tempfile::Builder::new() .prefix("new_state") @@ -84,7 +82,7 @@ impl SnapshotPackagerService { let mut tar = tar::Builder::new(enc); // Create the list of paths to compress, starting with the snapshots - let tar_output_snapshots_dir = Path::new(&TAR_SNAPSHOT_DIR); + let tar_output_snapshots_dir = Path::new(&TAR_SNAPSHOTS_DIR); // Add the snapshots to the tarball and delete the directory of hardlinks to the snapshots // that was created to persist those snapshots while this package was being created @@ -117,6 +115,12 @@ impl SnapshotPackagerService { fs::hard_link(&temp_tar_path, &snapshot_package.tar_output_file)?; Ok(()) } + + fn run(snapshot_receiver: &SnapshotPackageReceiver) -> Result<()> { + let snapshot_package = snapshot_receiver.recv_timeout(Duration::from_secs(1))?; + Self::package_snapshots(&snapshot_package)?; + Ok(()) + } } impl Service for SnapshotPackagerService { @@ -133,14 +137,12 @@ mod tests { use crate::snapshot_utils; use std::fs::OpenOptions; use std::io::Write; - use std::sync::mpsc::channel; use tempfile::TempDir; #[test] fn test_package_snapshots() { // Create temprorary placeholder directory for all test files let temp_dir = TempDir::new().unwrap(); - let (sender, receiver) = channel(); let accounts_dir = temp_dir.path().join("accounts"); let snapshots_dir = temp_dir.path().join("snapshots"); let snapshot_package_output_path = temp_dir.path().join("snapshots_output"); @@ -184,10 +186,9 @@ mod tests { storage_entries.clone(), output_tar_path.clone(), ); - sender.send(snapshot_package).unwrap(); // Make tarball from packageable snapshot - SnapshotPackagerService::package_snapshots(&receiver).unwrap(); + SnapshotPackagerService::package_snapshots(&snapshot_package).unwrap(); // Check tarball is correct snapshot_utils::tests::verify_snapshot_tar(output_tar_path, snapshots_dir, accounts_dir); diff --git a/core/src/snapshot_utils.rs b/core/src/snapshot_utils.rs index 5844c108b6..0ba5400bd6 100644 --- a/core/src/snapshot_utils.rs +++ b/core/src/snapshot_utils.rs @@ -2,7 +2,7 @@ use crate::result::{Error, Result}; use crate::snapshot_package::SnapshotPackage; use bincode::{deserialize_from, serialize_into}; use flate2::read::GzDecoder; -use solana_runtime::bank::{Bank, StatusCacheRc}; +use solana_runtime::bank::Bank; use std::fs; use std::fs::File; use std::io::{BufReader, BufWriter, Error as IOError, ErrorKind}; @@ -33,7 +33,7 @@ pub fn package_snapshot, Q: AsRef>( let account_storage_entries = bank.rc.get_storage_entries(); // Create a snapshot package - trace!( + info!( "Snapshot for bank: {} has {} account storage entries", slot, account_storage_entries.len() @@ -60,7 +60,8 @@ pub fn get_snapshot_names>(snapshot_path: P) -> Vec { entry.ok().and_then(|e| { e.path() .file_name() - .and_then(|n| n.to_str().map(|s| s.parse::().unwrap())) + .and_then(|n| n.to_str().map(|s| s.parse::().ok())) + .unwrap_or(None) }) }) .collect::>(); @@ -69,13 +70,13 @@ pub fn get_snapshot_names>(snapshot_path: P) -> Vec { names } -pub fn add_snapshot>(snapshot_path: P, bank: &Bank, root: u64) -> Result<()> { +pub fn add_snapshot>(snapshot_path: P, bank: &Bank) -> Result<()> { let slot = bank.slot(); let slot_snapshot_dir = get_bank_snapshot_dir(snapshot_path, slot); fs::create_dir_all(slot_snapshot_dir.clone()).map_err(Error::from)?; let snapshot_file_path = slot_snapshot_dir.join(get_snapshot_file_name(slot)); - trace!( + info!( "creating snapshot {}, path: {:?}", bank.slot(), snapshot_file_path @@ -84,20 +85,12 @@ pub fn add_snapshot>(snapshot_path: P, bank: &Bank, root: u64) -> let mut stream = BufWriter::new(file); // Create the snapshot - serialize_into(&mut stream, &*bank).map_err(|_| get_io_error("serialize bank error"))?; - let mut parent_slot: u64 = 0; - if let Some(parent_bank) = bank.parent() { - parent_slot = parent_bank.slot(); - } - serialize_into(&mut stream, &parent_slot) - .map_err(|_| get_io_error("serialize bank parent error"))?; - serialize_into(&mut stream, &root).map_err(|_| get_io_error("serialize root error"))?; - serialize_into(&mut stream, &bank.src) - .map_err(|_| get_io_error("serialize bank status cache error"))?; - serialize_into(&mut stream, &bank.rc) - .map_err(|_| get_io_error("serialize bank accounts error"))?; + serialize_into(&mut stream, &*bank).map_err(|e| get_io_error(&e.to_string()))?; + serialize_into(&mut stream, &bank.rc).map_err(|e| get_io_error(&e.to_string()))?; + // TODO: Add status cache serialization code + /*serialize_into(&mut stream, &bank.src).map_err(|e| get_io_error(&e.to_string()))?;*/ - trace!( + info!( "successfully created snapshot {}, path: {:?}", bank.slot(), snapshot_file_path @@ -112,44 +105,50 @@ pub fn remove_snapshot>(slot: u64, snapshot_path: P) -> Result<() Ok(()) } -pub fn load_snapshots>( - names: &[u64], - bank0: &mut Bank, - bank_maps: &mut Vec<(u64, u64, Bank)>, - status_cache_rc: &StatusCacheRc, +pub fn bank_from_snapshots( + local_account_paths: String, snapshot_path: P, -) -> Option { - let mut bank_root: Option = None; + append_vecs_path: Q, +) -> Result +where + P: AsRef, + Q: AsRef, +{ + // Rebuild the last root bank + let names = get_snapshot_names(&snapshot_path); + let last_root = names + .last() + .ok_or_else(|| get_io_error("No snapshots found in snapshots directory"))?; + let snapshot_file_name = get_snapshot_file_name(*last_root); + let snapshot_dir = get_bank_snapshot_dir(&snapshot_path, *last_root); + let snapshot_file_path = snapshot_dir.join(&snapshot_file_name); + info!("Load from {:?}", snapshot_file_path); + let file = File::open(snapshot_file_path)?; + let mut stream = BufReader::new(file); + let bank: Bank = deserialize_from(&mut stream).map_err(|e| get_io_error(&e.to_string()))?; - for (i, bank_slot) in names.iter().rev().enumerate() { + // Rebuild accounts + bank.rc + .accounts_from_stream(&mut stream, local_account_paths, append_vecs_path)?; + + for bank_slot in names.iter().rev() { let snapshot_file_name = get_snapshot_file_name(*bank_slot); let snapshot_dir = get_bank_snapshot_dir(&snapshot_path, *bank_slot); let snapshot_file_path = snapshot_dir.join(snapshot_file_name.clone()); - trace!("Load from {:?}", snapshot_file_path); - let file = File::open(snapshot_file_path); - if file.is_err() { - warn!("Snapshot file open failed for {}", bank_slot); - continue; - } - let file = file.unwrap(); + let file = File::open(snapshot_file_path)?; let mut stream = BufReader::new(file); - let bank: Result = - deserialize_from(&mut stream).map_err(|_| get_io_error("deserialize bank error")); - let slot: Result = deserialize_from(&mut stream) - .map_err(|_| get_io_error("deserialize bank parent error")); - let parent_slot = if slot.is_ok() { slot.unwrap() } else { 0 }; - let root: Result = - deserialize_from(&mut stream).map_err(|_| get_io_error("deserialize root error")); - let status_cache: Result = deserialize_from(&mut stream) - .map_err(|_| get_io_error("deserialize bank status cache error")); - if bank_root.is_none() && bank0.rc.update_from_stream(&mut stream).is_ok() { - bank_root = Some(root.unwrap()); - } + let _bank: Result = + deserialize_from(&mut stream).map_err(|e| get_io_error(&e.to_string())); + + // TODO: Uncomment and deserialize status cache here + + /*let status_cache: Result = deserialize_from(&mut stream) + .map_err(|e| get_io_error(&e.to_string())); if bank_root.is_some() { match bank { Ok(v) => { if status_cache.is_ok() { - let status_cache = status_cache.unwrap(); + let status_cache = status_cache?; status_cache_rc.append(&status_cache); // On the last snapshot, purge all outdated status cache // entries @@ -164,10 +163,10 @@ pub fn load_snapshots>( } } else { warn!("Load snapshot rc failed for {}", bank_slot); - } + }*/ } - bank_root + Ok(bank) } pub fn get_snapshot_tar_path>(snapshot_output_dir: P) -> PathBuf { @@ -228,7 +227,7 @@ fn get_io_error(error: &str) -> Error { #[cfg(test)] pub mod tests { use super::*; - use crate::snapshot_package::{TAR_ACCOUNTS_DIR, TAR_SNAPSHOT_DIR}; + use crate::snapshot_package::{TAR_ACCOUNTS_DIR, TAR_SNAPSHOTS_DIR}; use tempfile::TempDir; pub fn verify_snapshot_tar( @@ -245,7 +244,7 @@ pub mod tests { untar_snapshot_in(snapshot_tar, &unpack_dir).unwrap(); // Check snapshots are the same - let unpacked_snapshots = unpack_dir.join(&TAR_SNAPSHOT_DIR); + let unpacked_snapshots = unpack_dir.join(&TAR_SNAPSHOTS_DIR); assert!(!dir_diff::is_different(&snapshots_to_verify, unpacked_snapshots).unwrap()); // Check the account entries are the same diff --git a/core/src/storage_stage.rs b/core/src/storage_stage.rs index 7cab2762de..8294351191 100644 --- a/core/src/storage_stage.rs +++ b/core/src/storage_stage.rs @@ -205,7 +205,7 @@ impl StorageStage { ) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => { - break + break; } Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), _ => info!("Error from process_entries: {:?}", e), diff --git a/core/src/validator.rs b/core/src/validator.rs index 20459a2c7e..84db83ed4c 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -16,6 +16,7 @@ use crate::rpc_pubsub_service::PubSubService; use crate::rpc_service::JsonRpcService; use crate::rpc_subscriptions::RpcSubscriptions; use crate::service::Service; +use crate::snapshot_utils; use crate::storage_stage::StorageState; use crate::tpu::Tpu; use crate::tvu::{Sockets, Tvu}; @@ -314,21 +315,36 @@ fn get_bank_forks( let (mut bank_forks, bank_forks_info, leader_schedule_cache) = { let mut result = None; if snapshot_config.is_some() { - let bank_forks = BankForks::load_from_snapshot( - &genesis_block, - account_paths.clone(), - snapshot_config.as_ref().unwrap(), + let snapshot_config = snapshot_config.as_ref().unwrap(); + + // Get the path to the tar + let tar = snapshot_utils::get_snapshot_tar_path( + &snapshot_config.snapshot_package_output_path(), ); - match bank_forks { - Ok(v) => { - let bank = &v.working_bank(); - let fork_info = BankForksInfo { - bank_slot: bank.slot(), - entry_height: bank.tick_height(), - }; - result = Some((v, vec![fork_info], LeaderScheduleCache::new_from_bank(bank))); - } - Err(_) => warn!("Failed to load from snapshot, fallback to load from ledger"), + + // Check that the snapshot tar exists, try to load the snapshot if it does + if tar.exists() { + // Fail hard here if snapshot fails to load, don't silently continue + let bank_forks = BankForks::load_from_snapshot( + //&genesis_block, + account_paths + .clone() + .expect("Account paths not present when booting from snapshot"), + snapshot_config, + tar, + ) + .expect("Load from snapshot failed"); + + let bank = &bank_forks.working_bank(); + let fork_info = BankForksInfo { + bank_slot: bank.slot(), + entry_height: bank.tick_height(), + }; + result = Some(( + bank_forks, + vec![fork_info], + LeaderScheduleCache::new_from_bank(bank), + )); } } diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index 8b5431216a..75b9618c88 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -13,6 +13,7 @@ bincode = "1.1.4" bv = { version = "0.11.0", features = ["serde"] } byteorder = "1.3.2" fnv = "1.0.6" +fs_extra = "1.1.0" hashbrown = "0.2.0" lazy_static = "1.3.0" libc = "0.2.58" @@ -37,6 +38,7 @@ solana-storage-api = { path = "../programs/storage_api", version = "0.18.0-pre1" solana-vote-api = { path = "../programs/vote_api", version = "0.18.0-pre1" } solana-vote-program = { path = "../programs/vote_program", version = "0.18.0-pre1" } sys-info = "0.5.7" +tempfile = "3.1.0" [lib] crate-type = ["lib"] diff --git a/runtime/src/accounts.rs b/runtime/src/accounts.rs index b20c8c5a26..4aba7dca73 100644 --- a/runtime/src/accounts.rs +++ b/runtime/src/accounts.rs @@ -20,7 +20,8 @@ use solana_sdk::sysvar; use solana_sdk::transaction::Result; use solana_sdk::transaction::{Transaction, TransactionError}; use std::collections::{HashMap, HashSet}; -use std::io::{BufReader, Read}; +use std::io::{BufReader, Error as IOError, Read}; +use std::path::Path; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard}; @@ -64,11 +65,14 @@ impl Accounts { } } - pub fn update_from_stream( + pub fn accounts_from_stream>( &self, stream: &mut BufReader, - ) -> std::result::Result<(), std::io::Error> { - self.accounts_db.update_from_stream(stream) + local_paths: String, + append_vecs_path: P, + ) -> std::result::Result<(), IOError> { + self.accounts_db + .accounts_from_stream(stream, local_paths, append_vecs_path) } fn load_tx_accounts( @@ -638,6 +642,8 @@ mod tests { // TODO: all the bank tests are bank specific, issue: 2194 use super::*; + use crate::accounts_db::get_temp_accounts_paths; + use crate::accounts_db::tests::copy_append_vecs; use bincode::{serialize_into, serialized_size}; use rand::{thread_rng, Rng}; use solana_sdk::account::Account; @@ -650,6 +656,7 @@ mod tests { use std::io::Cursor; use std::sync::atomic::AtomicBool; use std::{thread, time}; + use tempfile::TempDir; fn load_accounts_with_fee( tx: Transaction, @@ -1153,7 +1160,8 @@ mod tests { #[test] fn test_accounts_serialize() { solana_logger::setup(); - let accounts = Accounts::new(None); + let (_accounts_dir, paths) = get_temp_accounts_paths(4).unwrap(); + let accounts = Accounts::new(Some(paths)); let mut pubkeys: Vec = vec![]; create_test_accounts(&accounts, &mut pubkeys, 100); @@ -1165,9 +1173,17 @@ mod tests { let mut writer = Cursor::new(&mut buf[..]); serialize_into(&mut writer, &*accounts.accounts_db).unwrap(); + let copied_accounts = TempDir::new().unwrap(); + + // Simulate obtaining a copy of the AppendVecs from a tarball + copy_append_vecs(&accounts.accounts_db, copied_accounts.path()).unwrap(); + let mut reader = BufReader::new(&buf[..]); - let daccounts = Accounts::new(Some(accounts.accounts_db.paths())); - assert!(daccounts.update_from_stream(&mut reader).is_ok()); + let (_accounts_dir, daccounts_paths) = get_temp_accounts_paths(2).unwrap(); + let daccounts = Accounts::new(Some(daccounts_paths.clone())); + assert!(daccounts + .accounts_from_stream(&mut reader, daccounts_paths, copied_accounts.path()) + .is_ok()); check_accounts(&daccounts, &pubkeys, 100); assert_eq!( accounts.hash_internal_state(0), diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 3a8a020f65..9164b73c0f 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -20,7 +20,8 @@ use crate::accounts_index::{AccountsIndex, Fork}; use crate::append_vec::{AppendVec, StorageMeta, StoredAccount}; -use bincode::{deserialize_from, serialize_into, serialized_size}; +use bincode::{deserialize_from, serialize_into}; +use fs_extra::dir::CopyOptions; use log::*; use rand::{thread_rng, Rng}; use rayon::prelude::*; @@ -33,17 +34,17 @@ use solana_sdk::account::{Account, LamportCredit}; use solana_sdk::pubkey::Pubkey; use std::collections::{HashMap, HashSet}; use std::fmt; -use std::fs::remove_dir_all; -use std::io::{BufReader, Cursor, Error, ErrorKind, Read}; +use std::io::{BufReader, Cursor, Error as IOError, ErrorKind, Read, Result as IOResult}; use std::path::Path; use std::path::PathBuf; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; use sys_info; +use tempfile::TempDir; pub const DEFAULT_FILE_SIZE: u64 = 4 * 1024 * 1024; pub const DEFAULT_NUM_THREADS: u32 = 8; -pub const DEFAULT_DIRS: &str = "0,1,2,3"; +pub const DEFAULT_NUM_DIRS: u32 = 4; #[derive(Debug, Default)] pub struct ErrorCounters { @@ -100,7 +101,6 @@ impl<'de> Visitor<'de> for AccountStorageVisitor { M: MapAccess<'de>, { let mut map = HashMap::new(); - while let Some((storage_id, storage_entry)) = access.next_entry()? { let storage_entry: AccountStorageEntry = storage_entry; let storage_fork_map = map @@ -177,7 +177,7 @@ pub struct AccountStorageEntry { impl AccountStorageEntry { pub fn new(path: &Path, fork_id: Fork, id: usize, file_size: u64) -> Self { - let tail = format!("{}.{}", fork_id, id); + let tail = AppendVec::new_relative_path(fork_id, id); let path = Path::new(path).join(&tail); let accounts = AppendVec::new(&path, true, file_size as usize); @@ -271,40 +271,31 @@ impl AccountStorageEntry { count_and_status.0 } + pub fn set_file>(&mut self, path: P) -> IOResult<()> { + self.accounts.set_file(path) + } + + pub fn get_relative_path(&self) -> Option { + AppendVec::get_relative_path(self.accounts.get_path()) + } + pub fn get_path(&self) -> PathBuf { self.accounts.get_path() } } -pub fn get_paths_vec(paths: &str) -> Vec { - paths.split(',').map(ToString::to_string).collect() +pub fn get_paths_vec(paths: &str) -> Vec { + paths.split(',').map(PathBuf::from).collect() } -#[derive(Debug)] -struct TempPaths { - pub paths: String, -} - -impl Drop for TempPaths { - fn drop(&mut self) { - let paths = get_paths_vec(&self.paths); - paths.iter().for_each(|p| { - let _ignored = remove_dir_all(p); - }); - } -} - -fn get_temp_accounts_path(paths: &str) -> TempPaths { - let paths = get_paths_vec(paths); - let out_dir = std::env::var("FARF_DIR").unwrap_or_else(|_| "farf".to_string()); - let rand = Pubkey::new_rand(); - let paths: Vec<_> = paths +pub fn get_temp_accounts_paths(count: u32) -> IOResult<(Vec, String)> { + let temp_dirs: IOResult> = (0..count).map(|_| TempDir::new()).collect(); + let temp_dirs = temp_dirs?; + let paths: Vec = temp_dirs .iter() - .map(|path| format!("{}/accounts_db/{}/{}", out_dir, rand, path)) + .map(|t| t.path().to_str().unwrap().to_owned()) .collect(); - TempPaths { - paths: paths.join(","), - } + Ok((temp_dirs, paths.join(","))) } // This structure handles the load/store of the accounts @@ -323,10 +314,10 @@ pub struct AccountsDB { write_version: AtomicUsize, /// Set of storage paths to pick from - paths: RwLock>, + paths: RwLock>, - /// Set of paths this accounts_db needs to hold/remove - temp_paths: Option, + /// Directory of paths this accounts_db needs to hold/remove + temp_paths: Option>, /// Starting file size of appendvecs file_size: u64, @@ -341,15 +332,13 @@ impl Default for AccountsDB { fn default() -> Self { let num_threads = sys_info::cpu_num().unwrap_or(DEFAULT_NUM_THREADS) as usize; - let temp_paths = get_temp_accounts_path(DEFAULT_DIRS); // make 4 directories by default - AccountsDB { accounts_index: RwLock::new(AccountsIndex::default()), storage: RwLock::new(AccountStorage(HashMap::new())), next_id: AtomicUsize::new(0), write_version: AtomicUsize::new(0), - paths: RwLock::new(get_paths_vec(&temp_paths.paths)), - temp_paths: Some(temp_paths), + paths: RwLock::new(vec![]), + temp_paths: None, file_size: DEFAULT_FILE_SIZE, thread_pool: rayon::ThreadPoolBuilder::new() .num_threads(num_threads) @@ -363,13 +352,20 @@ impl Default for AccountsDB { impl AccountsDB { pub fn new(paths: Option) -> Self { if let Some(paths) = paths { - AccountsDB { + Self { paths: RwLock::new(get_paths_vec(&paths)), temp_paths: None, - ..AccountsDB::default() + ..Self::default() } } else { - AccountsDB::default() + // Create a temprorary set of accounts directories, used primarily + // for testing + let (temp_dirs, paths) = get_temp_accounts_paths(DEFAULT_NUM_DIRS).unwrap(); + Self { + paths: RwLock::new(get_paths_vec(&paths)), + temp_paths: Some(temp_dirs), + ..Self::default() + } } } @@ -389,42 +385,87 @@ impl AccountsDB { } pub fn paths(&self) -> String { - self.paths.read().unwrap().join(",") + let paths: Vec = self + .paths + .read() + .unwrap() + .iter() + .map(|p| p.to_str().unwrap().to_owned()) + .collect(); + paths.join(",") } - pub fn update_from_stream( + pub fn accounts_from_stream>( &self, mut stream: &mut BufReader, - ) -> Result<(), std::io::Error> { - let _len: usize = deserialize_from(&mut stream) - .map_err(|_| AccountsDB::get_io_error("len deserialize error"))?; - *self.paths.write().unwrap() = deserialize_from(&mut stream) - .map_err(|_| AccountsDB::get_io_error("paths deserialize error"))?; - let mut storage: AccountStorage = deserialize_from(&mut stream) - .map_err(|_| AccountsDB::get_io_error("storage deserialize error"))?; + local_account_paths: String, + append_vecs_path: P, + ) -> Result<(), IOError> { + let _len: usize = + deserialize_from(&mut stream).map_err(|e| AccountsDB::get_io_error(&e.to_string()))?; + let storage: AccountStorage = + deserialize_from(&mut stream).map_err(|e| AccountsDB::get_io_error(&e.to_string()))?; + + // Remap the deserialized AppendVec paths to point to correct local paths + let local_account_paths = get_paths_vec(&local_account_paths); + let new_storage_map: Result, IOError> = storage + .0 + .into_iter() + .map(|(fork_id, mut fork_storage)| { + let mut new_fork_storage = HashMap::new(); + for (id, storage_entry) in fork_storage.drain() { + let path_index = thread_rng().gen_range(0, local_account_paths.len()); + let local_dir = &local_account_paths[path_index]; + + // Move the corresponding AppendVec from the snapshot into the directory pointed + // at by `local_dir` + let append_vec_relative_path = + AppendVec::new_relative_path(fork_id, storage_entry.id); + let append_vec_abs_path = + append_vecs_path.as_ref().join(&append_vec_relative_path); + let mut copy_options = CopyOptions::new(); + copy_options.overwrite = true; + fs_extra::move_items(&vec![append_vec_abs_path], &local_dir, ©_options) + .map_err(|e| AccountsDB::get_io_error(&e.to_string()))?; + + // Notify the AppendVec of the new file location + let local_path = local_dir.join(append_vec_relative_path); + let mut u_storage_entry = Arc::try_unwrap(storage_entry).unwrap(); + u_storage_entry + .set_file(local_path) + .map_err(|e| AccountsDB::get_io_error(&e.to_string()))?; + new_fork_storage.insert(id, Arc::new(u_storage_entry)); + } + Ok((fork_id, new_fork_storage)) + }) + .collect(); + + let new_storage_map = new_storage_map?; + let storage = AccountStorage(new_storage_map); let version: u64 = deserialize_from(&mut stream) .map_err(|_| AccountsDB::get_io_error("write version deserialize error"))?; - let mut ids: Vec = storage + // Process deserialized data, set necessary fields in self + *self.paths.write().unwrap() = local_account_paths; + let max_id: usize = *storage .0 .values() .flat_map(HashMap::keys) - .cloned() - .collect(); - ids.sort(); + .max() + .expect("At least one storage entry must exist from deserializing stream"); { let mut stores = self.storage.write().unwrap(); - if let Some((_, store0)) = storage.0.remove_entry(&0) { + /*if let Some((_, store0)) = storage.0.remove_entry(&0) { let fork_storage0 = stores.0.entry(0).or_insert_with(HashMap::new); for (id, store) in store0.iter() { fork_storage0.insert(*id, store.clone()); } - } + }*/ stores.0.extend(storage.0); } - self.next_id - .store(ids[ids.len() - 1] + 1, Ordering::Relaxed); + + self.next_id.store(max_id + 1, Ordering::Relaxed); self.write_version .fetch_add(version as usize, Ordering::Relaxed); self.generate_index(); @@ -757,6 +798,15 @@ impl AccountsDB { self.accounts_index.write().unwrap().add_root(fork) } + pub fn get_storage_entries(&self) -> Vec> { + let r_storage = self.storage.read().unwrap(); + r_storage + .0 + .values() + .flat_map(|fork_store| fork_store.values().cloned()) + .collect() + } + fn merge( dest: &mut HashMap, source: &HashMap, @@ -771,9 +821,9 @@ impl AccountsDB { } } - fn get_io_error(error: &str) -> Error { + fn get_io_error(error: &str) -> IOError { warn!("AccountsDB error: {:?}", error); - Error::new(ErrorKind::Other, error) + IOError::new(ErrorKind::Other, error) } fn generate_index(&self) { @@ -781,7 +831,6 @@ impl AccountsDB { let mut forks: Vec = storage.0.keys().cloned().collect(); forks.sort(); let mut accounts_index = self.accounts_index.write().unwrap(); - accounts_index.roots.insert(0); for fork_id in forks.iter() { let mut accumulator: Vec> = self .scan_account_storage( @@ -823,13 +872,8 @@ impl Serialize for AccountsDB { { use serde::ser::Error; let storage = self.storage.read().unwrap(); - let len = serialized_size(&self.paths).unwrap() - + serialized_size(&*storage).unwrap() - + std::mem::size_of::() as u64; - let mut buf = vec![0u8; len as usize]; - let mut wr = Cursor::new(&mut buf[..]); + let mut wr = Cursor::new(vec![]); let version: u64 = self.write_version.load(Ordering::Relaxed) as u64; - serialize_into(&mut wr, &self.paths).map_err(Error::custom)?; serialize_into(&mut wr, &*storage).map_err(Error::custom)?; serialize_into(&mut wr, &version).map_err(Error::custom)?; let len = wr.position() as usize; @@ -838,13 +882,15 @@ impl Serialize for AccountsDB { } #[cfg(test)] -mod tests { +pub mod tests { // TODO: all the bank tests are bank specific, issue: 2194 use super::*; use bincode::{serialize_into, serialized_size}; use maplit::hashmap; use rand::{thread_rng, Rng}; use solana_sdk::account::Account; + use std::fs; + use tempfile::TempDir; #[test] fn test_accountsdb_add_root() { @@ -1126,8 +1172,8 @@ mod tests { #[test] fn test_account_one() { - let paths = get_temp_accounts_path("one"); - let db = AccountsDB::new(Some(paths.paths.clone())); + let (_accounts_dirs, paths) = get_temp_accounts_paths(1).unwrap(); + let db = AccountsDB::new(Some(paths)); let mut pubkeys: Vec = vec![]; create_account(&db, &mut pubkeys, 0, 1, 0, 0); let ancestors = vec![(0, 0)].into_iter().collect(); @@ -1139,8 +1185,8 @@ mod tests { #[test] fn test_account_many() { - let paths = get_temp_accounts_path("many0,many1"); - let db = AccountsDB::new(Some(paths.paths.clone())); + let (_accounts_dirs, paths) = get_temp_accounts_paths(2).unwrap(); + let db = AccountsDB::new(Some(paths)); let mut pubkeys: Vec = vec![]; create_account(&db, &mut pubkeys, 0, 100, 0, 0); check_accounts(&db, &pubkeys, 0, 100, 1); @@ -1157,9 +1203,9 @@ mod tests { #[test] fn test_account_grow_many() { - let paths = get_temp_accounts_path("many2,many3"); + let (_accounts_dir, paths) = get_temp_accounts_paths(2).unwrap(); let size = 4096; - let accounts = AccountsDB::new_sized(Some(paths.paths.clone()), size); + let accounts = AccountsDB::new_sized(Some(paths), size); let mut keys = vec![]; for i in 0..9 { let key = Pubkey::new_rand(); @@ -1335,12 +1381,22 @@ mod tests { let mut reader = BufReader::new(&buf[..]); let daccounts = AccountsDB::new(None); - assert!(daccounts.update_from_stream(&mut reader).is_ok()); + let local_paths = daccounts.paths(); + let copied_accounts = TempDir::new().unwrap(); + // Simulate obtaining a copy of the AppendVecs from a tarball + copy_append_vecs(&accounts, copied_accounts.path()).unwrap(); + daccounts + .accounts_from_stream(&mut reader, local_paths, copied_accounts.path()) + .unwrap(); assert_eq!( daccounts.write_version.load(Ordering::Relaxed), accounts.write_version.load(Ordering::Relaxed) ); - assert_eq!(daccounts.paths(), accounts.paths()); + + assert_eq!( + daccounts.next_id.load(Ordering::Relaxed), + accounts.next_id.load(Ordering::Relaxed) + ); check_accounts(&daccounts, &pubkeys, 0, 100, 2); check_accounts(&daccounts, &pubkeys1, 1, 10, 1); @@ -1439,4 +1495,23 @@ mod tests { let ret = db.load_slow(&ancestors, &key).unwrap(); assert_eq!(ret.0.data.len(), data_len); } + + pub fn copy_append_vecs>( + accounts_db: &AccountsDB, + output_dir: P, + ) -> IOResult<()> { + let storage_entries = accounts_db.get_storage_entries(); + for storage in storage_entries { + let storage_path = storage.get_path(); + let output_path = output_dir.as_ref().join( + storage_path + .file_name() + .expect("Invalid AppendVec file path"), + ); + + fs::copy(storage_path, output_path)?; + } + + Ok(()) + } } diff --git a/runtime/src/accounts_index.rs b/runtime/src/accounts_index.rs index cd2ea42d39..4be78ff006 100644 --- a/runtime/src/accounts_index.rs +++ b/runtime/src/accounts_index.rs @@ -1,4 +1,3 @@ -use log::*; use solana_sdk::pubkey::Pubkey; use std::collections::{HashMap, HashSet}; use std::sync::{RwLock, RwLockReadGuard}; @@ -37,7 +36,6 @@ impl AccountsIndex { let mut rv = None; for (i, (fork, _t)) in list.iter().rev().enumerate() { if *fork >= max && (ancestors.get(fork).is_some() || self.is_root(*fork)) { - trace!("GET {} {:?} i: {}", fork, ancestors, i); rv = Some((list.len() - 1) - i); max = *fork; } diff --git a/runtime/src/append_vec.rs b/runtime/src/append_vec.rs index 5bedc3e731..b06f654315 100644 --- a/runtime/src/append_vec.rs +++ b/runtime/src/append_vec.rs @@ -5,6 +5,7 @@ use solana_sdk::account::Account; use solana_sdk::pubkey::Pubkey; use std::fmt; use std::fs::{create_dir_all, remove_file, OpenOptions}; +use std::io; use std::io::{Cursor, Seek, SeekFrom, Write}; use std::mem; use std::path::{Path, PathBuf}; @@ -151,6 +152,28 @@ impl AppendVec { self.file_size } + // Get the file path relative to the top level accounts directory + pub fn get_relative_path>(append_vec_path: P) -> Option { + append_vec_path.as_ref().file_name().map(PathBuf::from) + } + + pub fn new_relative_path(fork_id: u64, id: usize) -> PathBuf { + PathBuf::from(&format!("{}.{}", fork_id, id)) + } + + pub fn set_file>(&mut self, path: P) -> io::Result<()> { + self.path = path.as_ref().to_path_buf(); + let data = OpenOptions::new() + .read(true) + .write(true) + .create(false) + .open(&path)?; + + let map = unsafe { MmapMut::map_mut(&data)? }; + self.map = map; + Ok(()) + } + fn get_slice(&self, offset: usize, size: usize) -> Option<(&[u8], usize)> { let len = self.len(); @@ -370,25 +393,21 @@ impl<'a> serde::de::Visitor<'a> for AppendVecVisitor { } #[allow(clippy::mutex_atomic)] + // Note this does not initialize a valid Mmap in the AppendVec, needs to be done + // externally fn visit_bytes(self, data: &[u8]) -> std::result::Result where E: serde::de::Error, { use serde::de::Error; let mut rd = Cursor::new(&data[..]); + // TODO: this path does not need to be serialized, can remove let path: PathBuf = deserialize_from(&mut rd).map_err(Error::custom)?; let current_len: u64 = deserialize_from(&mut rd).map_err(Error::custom)?; let file_size: u64 = deserialize_from(&mut rd).map_err(Error::custom)?; let offset: usize = deserialize_from(&mut rd).map_err(Error::custom)?; - let data = OpenOptions::new() - .read(true) - .write(true) - .create(false) - .open(&path) - .map_err(|e| Error::custom(e.to_string()))?; - - let map = unsafe { MmapMut::map_mut(&data).map_err(|e| Error::custom(e.to_string()))? }; + let map = MmapMut::map_anon(1).map_err(|e| Error::custom(e.to_string()))?; Ok(AppendVec { path, map, @@ -492,21 +511,37 @@ pub mod tests { assert_eq!(av.get_account_test(index2).unwrap(), account2); assert_eq!(av.get_account_test(index1).unwrap(), account1); - let mut buf = vec![0u8; serialized_size(&av).unwrap() as usize]; - let mut writer = Cursor::new(&mut buf[..]); + let append_vec_path = &av.path; + + // Serialize the AppendVec + let mut writer = Cursor::new(vec![]); serialize_into(&mut writer, &av).unwrap(); - let mut reader = Cursor::new(&mut buf[..]); - let dav: AppendVec = deserialize_from(&mut reader).unwrap(); + // Deserialize the AppendVec + let buf = writer.into_inner(); + let mut reader = Cursor::new(&buf[..]); + let mut dav: AppendVec = deserialize_from(&mut reader).unwrap(); + // Set the AppendVec path + dav.set_file(append_vec_path).unwrap(); assert_eq!(dav.get_account_test(index2).unwrap(), account2); assert_eq!(dav.get_account_test(index1).unwrap(), account1); drop(dav); - // dropping dav above blows away underlying file's directory entry, - // which is what we're testing next. - let mut reader = Cursor::new(&mut buf[..]); - let dav: Result> = deserialize_from(&mut reader); - assert!(dav.is_err()); + // Dropping dav above blows away underlying file's directory entry, so + // trying to set the file will fail + let mut reader = Cursor::new(&buf[..]); + let mut dav: AppendVec = deserialize_from(&mut reader).unwrap(); + assert!(dav.set_file(append_vec_path).is_err()); + } + + #[test] + fn test_relative_path() { + let relative_path = AppendVec::new_relative_path(0, 2); + let full_path = Path::new("/tmp").join(&relative_path); + assert_eq!( + relative_path, + AppendVec::get_relative_path(full_path).unwrap() + ); } } diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 01c202089b..c6e92c2db9 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -45,7 +45,8 @@ use solana_sdk::transaction::{Result, Transaction, TransactionError}; use std::cmp; use std::collections::HashMap; use std::fmt; -use std::io::{BufReader, Cursor, Read}; +use std::io::{BufReader, Cursor, Error as IOError, Read}; +use std::path::Path; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, RwLock, RwLockReadGuard}; @@ -63,8 +64,8 @@ pub struct BankRc { } impl BankRc { - pub fn new(account_paths: Option, id: AppendVecId) -> Self { - let accounts = Accounts::new(account_paths); + pub fn new(account_paths: String, id: AppendVecId) -> Self { + let accounts = Accounts::new(Some(account_paths)); accounts .accounts_db .next_id @@ -75,25 +76,25 @@ impl BankRc { } } - pub fn update_from_stream( + pub fn accounts_from_stream>( &self, mut stream: &mut BufReader, - ) -> std::result::Result<(), std::io::Error> { - let _len: usize = deserialize_from(&mut stream) - .map_err(|_| BankRc::get_io_error("len deserialize error"))?; - self.accounts.update_from_stream(stream) + local_paths: String, + append_vecs_path: P, + ) -> std::result::Result<(), IOError> { + let _len: usize = + deserialize_from(&mut stream).map_err(|e| BankRc::get_io_error(&e.to_string()))?; + self.accounts + .accounts_from_stream(stream, local_paths, append_vecs_path)?; + + Ok(()) } pub fn get_storage_entries(&self) -> Vec> { - let r_storage = self.accounts.accounts_db.storage.read().unwrap(); - r_storage - .0 - .values() - .flat_map(|fork_store| fork_store.values().cloned()) - .collect() + self.accounts.accounts_db.get_storage_entries() } - fn get_io_error(error: &str) -> std::io::Error { + fn get_io_error(error: &str) -> IOError { warn!("BankRc error: {:?}", error); std::io::Error::new(std::io::ErrorKind::Other, error) } @@ -105,9 +106,7 @@ impl Serialize for BankRc { S: serde::ser::Serializer, { use serde::ser::Error; - let len = serialized_size(&*self.accounts.accounts_db).unwrap(); - let mut buf = vec![0u8; len as usize]; - let mut wr = Cursor::new(&mut buf[..]); + let mut wr = Cursor::new(Vec::new()); serialize_into(&mut wr, &*self.accounts.accounts_db).map_err(Error::custom)?; let len = wr.position() as usize; serializer.serialize_bytes(&wr.into_inner()[..len]) @@ -393,7 +392,7 @@ impl Bank { pub fn create_with_genesis( genesis_block: &GenesisBlock, - account_paths: Option, + account_paths: String, status_cache_rc: &StatusCacheRc, id: AppendVecId, ) -> Self { @@ -1472,9 +1471,10 @@ impl Bank { let dbhq = dbank.blockhash_queue.read().unwrap(); assert_eq!(*bhq, *dbhq); - let sc = self.src.status_cache.read().unwrap(); + // TODO: Uncomment once status cache serialization is done + /*let sc = self.src.status_cache.read().unwrap(); let dsc = dbank.src.status_cache.read().unwrap(); - assert_eq!(*sc, *dsc); + assert_eq!(*sc, *dsc);*/ assert_eq!( self.rc.accounts.hash_internal_state(self.slot), dbank.rc.accounts.hash_internal_state(dbank.slot) @@ -1498,6 +1498,8 @@ impl Drop for Bank { #[cfg(test)] mod tests { use super::*; + use crate::accounts_db::get_temp_accounts_paths; + use crate::accounts_db::tests::copy_append_vecs; use crate::epoch_schedule::MINIMUM_SLOTS_PER_EPOCH; use crate::genesis_utils::{ create_genesis_block_with_leader, GenesisBlockInfo, BOOTSTRAP_LEADER_LAMPORTS, @@ -1517,6 +1519,7 @@ mod tests { use solana_vote_api::vote_state::{VoteState, MAX_LOCKOUT_HISTORY}; use std::io::Cursor; use std::time::Duration; + use tempfile::TempDir; #[test] fn test_bank_new() { @@ -2804,8 +2807,20 @@ mod tests { let mut rdr = Cursor::new(&buf[..]); let mut dbank: Bank = deserialize_from(&mut rdr).unwrap(); let mut reader = BufReader::new(&buf[rdr.position() as usize..]); - dbank.set_bank_rc(&BankRc::new(None, 0), &StatusCacheRc::default()); - assert!(dbank.rc.update_from_stream(&mut reader).is_ok()); + + // Create a new set of directories for this bank's accounts + let (_accounts_dir, dbank_paths) = get_temp_accounts_paths(4).unwrap();; + dbank.set_bank_rc( + &BankRc::new(dbank_paths.clone(), 0), + &StatusCacheRc::default(), + ); + // Create a directory to simulate AppendVecs unpackaged from a snapshot tar + let copied_accounts = TempDir::new().unwrap(); + copy_append_vecs(&bank.rc.accounts.accounts_db, copied_accounts.path()).unwrap(); + dbank + .rc + .accounts_from_stream(&mut reader, dbank_paths, copied_accounts.path()) + .unwrap(); assert_eq!(dbank.get_balance(&key.pubkey()), 10); bank.compare_bank(&dbank); } diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index 3642fa6b88..e4be3bb4fd 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -32,3 +32,6 @@ extern crate solana_bpf_loader_program; #[macro_use] extern crate serde_derive; + +extern crate fs_extra; +extern crate tempfile;