From 7fd879b417d09b3021a4afa1b5070f76a3533774 Mon Sep 17 00:00:00 2001 From: Sathish Ambley Date: Wed, 5 Jun 2019 21:51:44 -0700 Subject: [PATCH] Restart validator nodes from snapshots --- Cargo.lock | 1 + core/src/bank_forks.rs | 64 ++++++++++--------- core/src/validator.rs | 3 +- multinode-demo/fullnode.sh | 24 +++----- runtime/Cargo.toml | 1 + runtime/src/accounts.rs | 29 +++++---- runtime/src/accounts_db.rs | 123 ++++++++++++++++--------------------- runtime/src/append_vec.rs | 32 ++++++++-- runtime/src/bank.rs | 90 ++++++++++++++------------- 9 files changed, 193 insertions(+), 174 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 16c8080cc..4a5392e15 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2653,6 +2653,7 @@ dependencies = [ "byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", "hashbrown 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.58 (registry+https://github.com/rust-lang/crates.io-index)", "libloading 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/core/src/bank_forks.rs b/core/src/bank_forks.rs index 24757e2ec..36506dbf1 100644 --- a/core/src/bank_forks.rs +++ b/core/src/bank_forks.rs @@ -225,20 +225,18 @@ impl BankForks { } fn load_snapshots( - genesis_block: &GenesisBlock, names: &[u64], + bank0: &mut Bank, bank_maps: &mut Vec<(u64, u64, Bank)>, status_cache_rc: &StatusCacheRc, snapshot_path: &Option, - ) -> Option<(BankRc, u64)> { + ) -> Option { let path = BankForks::get_snapshot_path(snapshot_path); - let mut bank_rc: Option<(BankRc, u64)> = None; + let mut bank_root: Option = None; - println!("names: {:?}", names); for bank_slot in names.iter().rev() { let bank_path = format!("{}", bank_slot); let bank_file_path = path.join(bank_path.clone()); - println!("Load from {:?}", bank_file_path); info!("Load from {:?}", bank_file_path); let file = File::open(bank_file_path); if file.is_err() { @@ -256,14 +254,10 @@ impl BankForks { .map_err(|_| BankForks::get_io_error("deserialize root error")); let status_cache: Result = deserialize_from(&mut stream) .map_err(|_| BankForks::get_io_error("deserialize bank status cache error")); - if bank_rc.is_none() { - let rc: Result = deserialize_from(&mut stream) - .map_err(|_| BankForks::get_io_error("deserialize bank accounts error")); - if rc.is_ok() { - bank_rc = Some((rc.unwrap(), root.unwrap())); - } + if bank_root.is_none() && bank0.rc.update_from_stream(&mut stream).is_ok() { + bank_root = Some(root.unwrap()); } - if bank_rc.is_some() { + if bank_root.is_some() { match bank { Ok(v) => { if status_cache.is_ok() { @@ -278,10 +272,8 @@ impl BankForks { warn!("Load snapshot rc failed for {}", bank_slot); } } - let bank0 = Bank::new(&genesis_block); - bank0.freeze(); - bank_maps.push((0, 0, bank0)); - bank_rc + + bank_root } fn setup_banks( @@ -317,7 +309,11 @@ impl BankForks { (banks, slots, last_slot) } - pub fn load_from_snapshot(genesis_block: &GenesisBlock, snapshot_path: &Option) -> Result { + pub fn load_from_snapshot( + genesis_block: &GenesisBlock, + account_paths: Option, + snapshot_path: &Option, + ) -> Result { let path = BankForks::get_snapshot_path(snapshot_path); let paths = fs::read_dir(path)?; let mut names = paths @@ -330,21 +326,27 @@ impl BankForks { }) .collect::>(); - println!("names before: {:?}", names); - // names.retain(|&x| x != 0); - println!("names after : {:?}", names); names.sort(); let mut bank_maps = vec![]; let status_cache_rc = StatusCacheRc::default(); - let rc = BankForks::load_snapshots(&genesis_block, &names, &mut bank_maps, &status_cache_rc, snapshot_path); - if bank_maps.is_empty() || rc.is_none() { + let mut bank0 = + Bank::create_with_genesis(&genesis_block, account_paths.clone(), &status_cache_rc); + bank0.freeze(); + let bank_root = BankForks::load_snapshots( + &names, + &mut bank0, + &mut bank_maps, + &status_cache_rc, + snapshot_path, + ); + if bank_maps.is_empty() || bank_root.is_none() { BankForks::remove_snapshot(0, snapshot_path); return Err(Error::new(ErrorKind::Other, "no snapshots loaded")); } - let (bank_rc, root) = rc.unwrap(); + let root = bank_root.unwrap(); let (banks, slots, last_slot) = - BankForks::setup_banks(&mut bank_maps, &bank_rc, &status_cache_rc); + BankForks::setup_banks(&mut bank_maps, &bank0.rc, &status_cache_rc); let working_bank = banks[&last_slot].clone(); Ok(BankForks { banks, @@ -486,8 +488,15 @@ mod tests { } } - fn restore_from_snapshot(genesis_block: &GenesisBlock, bank_forks: BankForks, last_slot: u64) { - let new = BankForks::load_from_snapshot(&genesis_block, &bank_forks.snapshot_path).unwrap(); + fn restore_from_snapshot( + genesis_block: &GenesisBlock, + bank_forks: BankForks, + account_paths: Option, + last_slot: u64, + ) { + let new = + BankForks::load_from_snapshot(&genesis_block, account_paths, &bank_forks.snapshot_path) + .unwrap(); for (slot, _) in new.banks.iter() { if *slot > 0 { let bank = bank_forks.banks.get(slot).unwrap().clone(); @@ -531,10 +540,9 @@ mod tests { bank.freeze(); let slot = bank.slot(); bank_forks.insert(bank); - println!("add snapshot {}", slot); bank_forks.add_snapshot(slot, 0).unwrap(); } - restore_from_snapshot(&genesis_block, bank_forks, index); + restore_from_snapshot(&genesis_block, bank_forks, Some(path.paths.clone()), index); } } } diff --git a/core/src/validator.rs b/core/src/validator.rs index 0b2ceba87..cd110ccfc 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -298,7 +298,8 @@ fn get_bank_forks( snapshot_path: Option, ) -> (BankForks, Vec, LeaderScheduleCache) { if snapshot_path.is_some() { - let bank_forks = BankForks::load_from_snapshot(&genesis_block, &snapshot_path); + let bank_forks = + BankForks::load_from_snapshot(&genesis_block, account_paths.clone(), &snapshot_path); match bank_forks { Ok(v) => { let bank = &v.working_bank(); diff --git a/multinode-demo/fullnode.sh b/multinode-demo/fullnode.sh index a607689c4..b9f5edce6 100755 --- a/multinode-demo/fullnode.sh +++ b/multinode-demo/fullnode.sh @@ -315,12 +315,9 @@ elif [[ $node_type = bootstrap_leader ]]; then vote_keypair_path="$SOLANA_CONFIG_DIR"/bootstrap-leader-vote-keypair.json ledger_config_dir="$SOLANA_CONFIG_DIR"/bootstrap-leader-ledger accounts_config_dir="$SOLANA_CONFIG_DIR"/bootstrap-leader-accounts - storage_keypair_path=$SOLANA_CONFIG_DIR/bootstrap-leader-storage-keypair.json -<<<<<<< HEAD - configured_flag=$SOLANA_CONFIG_DIR/bootstrap-leader.configured -======= snapshot_config_dir="$SOLANA_CONFIG_DIR"/bootstrap-leader-snapshots ->>>>>>> validator restart + storage_keypair_path=$SOLANA_CONFIG_DIR/bootstrap-leader-storage-keypair.json + configured_flag=$SOLANA_CONFIG_DIR/bootstrap-leader.configured default_arg --rpc-port 8899 if ((airdrops_enabled)); then @@ -342,11 +339,8 @@ elif [[ $node_type = validator ]]; then storage_keypair_path=$SOLANA_CONFIG_DIR/validator-storage-keypair$label.json ledger_config_dir=$SOLANA_CONFIG_DIR/validator-ledger$label accounts_config_dir=$SOLANA_CONFIG_DIR/validator-accounts$label -<<<<<<< HEAD - configured_flag=$SOLANA_CONFIG_DIR/validator$label.configured -======= snapshot_config_dir="$SOLANA_CONFIG_DIR"/validator-snapshots$label ->>>>>>> validator restart + configured_flag=$SOLANA_CONFIG_DIR/validator$label.configured mkdir -p "$SOLANA_CONFIG_DIR" [[ -r "$identity_keypair_path" ]] || $solana_keygen new -o "$identity_keypair_path" @@ -432,11 +426,13 @@ while true; do ( set -x - if [[ -d "$SOLANA_RSYNC_CONFIG_DIR"/snapshots ]]; then - if [[ ! -d $snapshot_config_dir ]]; then - cp -a "$SOLANA_RSYNC_CONFIG_DIR"/snapshots/ "$snapshot_config_dir" - cp -a "$SOLANA_RSYNC_CONFIG_DIR"/accounts/ "$accounts_config_dir" - fi + if [[ $node_type = validator ]]; then + rm -rf "$ledger_config_dir" + if [[ -d "$SOLANA_RSYNC_CONFIG_DIR"/snapshots ]]; then + rm -rf "$snapshot_config_dir" "$accounts_config_dir" + cp -a "$SOLANA_RSYNC_CONFIG_DIR"/snapshots/ "$snapshot_config_dir" + cp -a "$SOLANA_RSYNC_CONFIG_DIR"/accounts/ "$accounts_config_dir" + fi fi if [[ ! -d "$ledger_config_dir" ]]; then cp -a "$SOLANA_RSYNC_CONFIG_DIR"/ledger/ "$ledger_config_dir" diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index fc3cdfdce..c6bf21491 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -14,6 +14,7 @@ bv = { version = "0.11.0", features = ["serde"] } byteorder = "1.3.2" fnv = "1.0.6" hashbrown = "0.2.0" +lazy_static = "1.3.0" libc = "0.2.58" libloading = "0.5.1" log = "0.4.2" diff --git a/runtime/src/accounts.rs b/runtime/src/accounts.rs index a82d48852..93fe57535 100644 --- a/runtime/src/accounts.rs +++ b/runtime/src/accounts.rs @@ -8,7 +8,6 @@ use crate::blockhash_queue::BlockhashQueue; use crate::message_processor::has_duplicates; use bincode::serialize; use log::*; -use serde::{Deserialize, Serialize}; use solana_metrics::inc_new_counter_error; use solana_sdk::account::{Account, LamportCredit}; use solana_sdk::hash::{Hash, Hasher}; @@ -22,6 +21,8 @@ use solana_sdk::transaction::{Transaction, TransactionError}; use std::collections::{HashMap, HashSet}; use std::env; use std::fs::remove_dir_all; +use std::io::{BufReader, Read}; +use std::iter::once; use std::ops::Neg; use std::path::Path; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -31,17 +32,16 @@ const ACCOUNTSDB_DIR: &str = "accountsdb"; const NUM_ACCOUNT_DIRS: usize = 4; /// This structure handles synchronization for db -#[derive(Default, Debug, Serialize, Deserialize)] +#[derive(Default, Debug)] pub struct Accounts { /// Single global AccountsDB - #[serde(skip)] pub accounts_db: Arc, /// set of accounts which are currently in the pipeline account_locks: Mutex>, /// List of persistent stores - paths: String, + pub paths: String, /// set to true if object created the directories in paths /// when true, delete parents of 'paths' on drop @@ -113,6 +113,13 @@ impl Accounts { } } + pub fn update_from_stream( + &self, + stream: &mut BufReader, + ) -> std::result::Result<(), std::io::Error> { + self.accounts_db.update_from_stream(stream) + } + fn load_tx_accounts( storage: &AccountStorage, ancestors: &HashMap, @@ -190,7 +197,6 @@ impl Accounts { Some(program) => program, None => { error_counters.account_not_found += 1; - info!("ancestors {:?}, accouts index {:?}, id {:?}", ancestors, accounts_index, program_id); return Err(TransactionError::ProgramAccountNotFound); } }; @@ -486,7 +492,7 @@ mod tests { // TODO: all the bank tests are bank specific, issue: 2194 use super::*; - use bincode::{deserialize_from, serialize_into, serialized_size}; + use bincode::{serialize_into, serialized_size}; use rand::{thread_rng, Rng}; use solana_sdk::account::Account; use solana_sdk::fee_calculator::FeeCalculator; @@ -1013,17 +1019,14 @@ mod tests { check_accounts(&accounts, &pubkeys, 100); accounts.add_root(0); - let sz = - serialized_size(&accounts).unwrap() + serialized_size(&*accounts.accounts_db).unwrap(); + let sz = serialized_size(&*accounts.accounts_db).unwrap(); let mut buf = vec![0u8; sz as usize]; let mut writer = Cursor::new(&mut buf[..]); - serialize_into(&mut writer, &accounts).unwrap(); serialize_into(&mut writer, &*accounts.accounts_db).unwrap(); - let mut reader = Cursor::new(&mut buf[..]); - let mut daccounts: Accounts = deserialize_from(&mut reader).unwrap(); - let accounts_db: AccountsDB = deserialize_from(&mut reader).unwrap(); - daccounts.accounts_db = Arc::new(accounts_db); + let mut reader = BufReader::new(&buf[..]); + let daccounts = Accounts::new(Some("serialize_accounts".to_string())); + assert!(daccounts.update_from_stream(&mut reader).is_ok()); check_accounts(&daccounts, &pubkeys, 100); assert_eq!( accounts.hash_internal_state(0), diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 577d47bb2..689e6f611 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -33,7 +33,7 @@ use solana_sdk::pubkey::Pubkey; use std::collections::{HashMap, HashSet}; use std::fmt; use std::fs::{create_dir_all, remove_dir_all}; -use std::io::Cursor; +use std::io::{BufReader, Cursor, Error, ErrorKind, Read}; use std::path::Path; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; @@ -311,6 +311,44 @@ impl AccountsDB { Self::new_with_file_size(paths, ACCOUNT_DATA_FILE_SIZE) } + pub fn update_from_stream( + &self, + mut stream: &mut BufReader, + ) -> Result<(), std::io::Error> { + AppendVec::set_account_paths(&self.paths); + + let _len: usize = deserialize_from(&mut stream) + .map_err(|_| AccountsDB::get_io_error("len deserialize error"))?; + let accounts_index: AccountsIndex = deserialize_from(&mut stream) + .map_err(|_| AccountsDB::get_io_error("accounts index deserialize error"))?; + let storage: AccountStorage = deserialize_from(&mut stream) + .map_err(|_| AccountsDB::get_io_error("storage deserialize error"))?; + let version: usize = deserialize_from(&mut stream) + .map_err(|_| AccountsDB::get_io_error("write version deserialize error"))?; + + let mut ids: Vec = storage + .0 + .values() + .flat_map(HashMap::keys) + .cloned() + .collect(); + ids.sort(); + + { + let mut index = self.accounts_index.write().unwrap(); + index.account_maps.extend(accounts_index.account_maps); + let union = index.roots.union(&accounts_index.roots); + index.roots = union.cloned().collect(); + index.last_root = accounts_index.last_root; + } + *self.storage.write().unwrap() = storage; + self.next_id + .store(ids[ids.len() - 1] + 1, Ordering::Relaxed); + self.write_version.store(version, Ordering::Relaxed); + self.generate_index(); + Ok(()) + } + fn new_storage_entry(&self, fork_id: Fork, path: &str) -> AccountStorageEntry { AccountStorageEntry::new( path, @@ -576,10 +614,17 @@ impl AccountsDB { } } - fn generate_index(&mut self) { + fn get_io_error(error: &str) -> Error { + warn!("AccountsDB error: {:?}", error); + Error::new(ErrorKind::Other, error) + } + + fn generate_index(&self) { let mut forks: Vec = self.storage.read().unwrap().0.keys().cloned().collect(); forks.sort(); + let mut accounts_index = self.accounts_index.write().unwrap(); + accounts_index.roots.insert(0); for fork_id in forks.iter() { let mut accumulator: Vec> = self .scan_account_storage( @@ -603,7 +648,6 @@ impl AccountsDB { while let Some(maps) = accumulator.pop() { AccountsDB::merge(&mut account_maps, &maps); } - let mut accounts_index = self.accounts_index.write().unwrap(); for (pubkey, (_, account_info)) in account_maps.iter() { accounts_index.add_index(*fork_id, pubkey, account_info.clone()); } @@ -618,90 +662,27 @@ impl Serialize for AccountsDB { { use serde::ser::Error; let len = serialized_size(&self.accounts_index).unwrap() - + serialized_size(&self.paths).unwrap() + serialized_size(&self.storage).unwrap() - + std::mem::size_of::() as u64 - + serialized_size(&self.file_size).unwrap(); + + std::mem::size_of::() as u64; let mut buf = vec![0u8; len as usize]; let mut wr = Cursor::new(&mut buf[..]); serialize_into(&mut wr, &self.accounts_index).map_err(Error::custom)?; - serialize_into(&mut wr, &self.paths).map_err(Error::custom)?; serialize_into(&mut wr, &self.storage).map_err(Error::custom)?; serialize_into( &mut wr, &(self.write_version.load(Ordering::Relaxed) as u64), ) .map_err(Error::custom)?; - serialize_into(&mut wr, &self.file_size).map_err(Error::custom)?; let len = wr.position() as usize; serializer.serialize_bytes(&wr.into_inner()[..len]) } } -struct AccountsDBVisitor; - -impl<'a> serde::de::Visitor<'a> for AccountsDBVisitor { - type Value = AccountsDB; - - fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { - formatter.write_str("Expecting AccountsDB") - } - - fn visit_bytes(self, data: &[u8]) -> std::result::Result - where - E: serde::de::Error, - { - use serde::de::Error; - let mut rd = Cursor::new(&data[..]); - let accounts_index: RwLock> = - deserialize_from(&mut rd).map_err(Error::custom)?; - let paths: Vec = deserialize_from(&mut rd).map_err(Error::custom)?; - let storage: RwLock = deserialize_from(&mut rd).map_err(Error::custom)?; - let write_version: u64 = deserialize_from(&mut rd).map_err(Error::custom)?; - let file_size: u64 = deserialize_from(&mut rd).map_err(Error::custom)?; - - let mut ids: Vec = storage - .read() - .unwrap() - .0 - .values() - .flat_map(HashMap::keys) - .cloned() - .collect(); - ids.sort(); - - let mut accounts_db = AccountsDB { - accounts_index, - storage, - next_id: AtomicUsize::new(ids[ids.len() - 1] + 1), - write_version: AtomicUsize::new(write_version as usize), - paths, - file_size, - thread_pool: rayon::ThreadPoolBuilder::new() - .num_threads(2) - .build() - .unwrap(), - }; - accounts_db.generate_index(); - - Ok(accounts_db) - } -} - -impl<'de> Deserialize<'de> for AccountsDB { - fn deserialize(deserializer: D) -> std::result::Result - where - D: ::serde::Deserializer<'de>, - { - deserializer.deserialize_bytes(AccountsDBVisitor) - } -} - #[cfg(test)] mod tests { // TODO: all the bank tests are bank specific, issue: 2194 use super::*; - use bincode::{deserialize_from, serialize_into, serialized_size}; + use bincode::{serialize_into, serialized_size}; use maplit::hashmap; use rand::{thread_rng, Rng}; use solana_sdk::account::Account; @@ -1008,6 +989,7 @@ mod tests { let stores = accounts.storage.read().unwrap(); assert_eq!(stores.0[&0].len(), 1); assert_eq!(stores.0[&0][&0].status(), AccountStorageStatus::Available); + assert_eq!(stores.0[&0][&0].count(), count); stores.0[&0][&0].count() == count } @@ -1257,8 +1239,9 @@ mod tests { let mut writer = Cursor::new(&mut buf[..]); serialize_into(&mut writer, &accounts).unwrap(); - let mut reader = Cursor::new(&mut buf[..]); - let daccounts: AccountsDB = deserialize_from(&mut reader).unwrap(); + let mut reader = BufReader::new(&buf[..]); + let daccounts = AccountsDB::new(&paths.paths); + assert!(daccounts.update_from_stream(&mut reader).is_ok()); check_accounts(&daccounts, &pubkeys, 0, 100, 2); check_accounts(&daccounts, &pubkeys1, 1, 10, 1); } diff --git a/runtime/src/append_vec.rs b/runtime/src/append_vec.rs index 3fd563f3f..3f9dfd17d 100644 --- a/runtime/src/append_vec.rs +++ b/runtime/src/append_vec.rs @@ -1,4 +1,6 @@ use bincode::{deserialize_from, serialize_into, serialized_size}; +use lazy_static::lazy_static; +use log::warn; use memmap::MmapMut; use serde::{Deserialize, Serialize}; use solana_sdk::account::Account; @@ -61,6 +63,10 @@ impl<'a> StoredAccount<'a> { } } +lazy_static! { + static ref ACCOUNT_PATHS: Mutex> = Mutex::new(vec![]); +} + #[derive(Debug)] #[allow(clippy::mutex_atomic)] pub struct AppendVec { @@ -256,6 +262,10 @@ impl AppendVec { pub fn append_account_test(&self, data: &(StorageMeta, Account)) -> Option { self.append_account(data.0.clone(), &data.1) } + + pub fn set_account_paths(paths: &[String]) { + ACCOUNT_PATHS.lock().unwrap().extend_from_slice(paths); + } } pub mod test_utils { @@ -347,20 +357,34 @@ impl<'a> serde::de::Visitor<'a> for AppendVecVisitor { let file_size: u64 = deserialize_from(&mut rd).map_err(Error::custom)?; let offset: usize = deserialize_from(&mut rd).map_err(Error::custom)?; + let split_path: Vec<&str> = path.to_str().unwrap().rsplit('/').collect(); + let account_paths = ACCOUNT_PATHS.lock().unwrap().clone(); + let mut account_path = path.clone(); + for dir_path in account_paths.iter() { + let fullpath = format!("{}/{}/{}", dir_path, split_path[1], split_path[0]); + let file_path = Path::new(&fullpath); + if file_path.exists() { + account_path = file_path.to_path_buf(); + break; + } + } + let data = OpenOptions::new() .read(true) .write(true) .create(false) - .open(path.as_path()); + .open(account_path.as_path()); if data.is_err() { - std::fs::create_dir_all(&path.parent().unwrap()).expect("Create directory failed"); - return Ok(AppendVec::new(&path, true, file_size as usize)); + warn!("account open {:?} failed, create empty", account_path); + std::fs::create_dir_all(&account_path.parent().unwrap()) + .expect("Create directory failed"); + return Ok(AppendVec::new(&account_path, true, file_size as usize)); } let map = unsafe { MmapMut::map_mut(&data.unwrap()).expect("failed to map the data file") }; Ok(AppendVec { - path, + path: account_path, map, append_offset: Mutex::new(offset), current_len: AtomicUsize::new(current_len as usize), diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 04561bb0d..ece3eda0d 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -42,7 +42,7 @@ use solana_sdk::transaction::{Result, Transaction, TransactionError}; use std::cmp; use std::collections::HashMap; use std::fmt; -use std::io::Cursor; +use std::io::{BufReader, Cursor, Read}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, RwLock, RwLockReadGuard}; use std::time::Instant; @@ -60,59 +60,45 @@ pub struct BankRc { parent: RwLock>>, } +impl BankRc { + pub fn new(account_paths: Option) -> Self { + let accounts = Accounts::new(account_paths); + BankRc { + accounts: Arc::new(accounts), + parent: RwLock::new(None), + } + } + + pub fn update_from_stream( + &self, + mut stream: &mut BufReader, + ) -> std::result::Result<(), std::io::Error> { + let _len: usize = deserialize_from(&mut stream) + .map_err(|_| BankRc::get_io_error("len deserialize error"))?; + self.accounts.update_from_stream(stream) + } + + fn get_io_error(error: &str) -> std::io::Error { + warn!("BankRc error: {:?}", error); + std::io::Error::new(std::io::ErrorKind::Other, error) + } +} + impl Serialize for BankRc { fn serialize(&self, serializer: S) -> std::result::Result where S: serde::ser::Serializer, { use serde::ser::Error; - let len = serialized_size(&*self.accounts.accounts_db).unwrap() - + serialized_size(&*self.accounts).unwrap(); + let len = serialized_size(&*self.accounts.accounts_db).unwrap(); let mut buf = vec![0u8; len as usize]; let mut wr = Cursor::new(&mut buf[..]); - serialize_into(&mut wr, &*self.accounts).map_err(Error::custom)?; serialize_into(&mut wr, &*self.accounts.accounts_db).map_err(Error::custom)?; let len = wr.position() as usize; serializer.serialize_bytes(&wr.into_inner()[..len]) } } -struct BankRcVisitor; - -impl<'a> serde::de::Visitor<'a> for BankRcVisitor { - type Value = BankRc; - - fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { - formatter.write_str("Expecting BankRc") - } - - #[allow(clippy::mutex_atomic)] - fn visit_bytes(self, data: &[u8]) -> std::result::Result - where - E: serde::de::Error, - { - use serde::de::Error; - let mut rd = Cursor::new(&data[..]); - let mut accounts: Accounts = deserialize_from(&mut rd).map_err(Error::custom)?; - let accounts_db: AccountsDB = deserialize_from(&mut rd).map_err(Error::custom)?; - - accounts.accounts_db = Arc::new(accounts_db); - Ok(BankRc { - accounts: Arc::new(accounts), - parent: RwLock::new(None), - }) - } -} - -impl<'de> Deserialize<'de> for BankRc { - fn deserialize(deserializer: D) -> std::result::Result - where - D: ::serde::Deserializer<'de>, - { - deserializer.deserialize_bytes(BankRcVisitor) - } -} - #[derive(Default)] pub struct StatusCacheRc { /// where all the Accounts are stored @@ -381,6 +367,18 @@ impl Bank { &self.collector_id } + pub fn create_with_genesis( + genesis_block: &GenesisBlock, + account_paths: Option, + status_cache_rc: &StatusCacheRc, + ) -> Self { + let mut bank = Self::default(); + bank.set_bank_rc(&BankRc::new(account_paths), &status_cache_rc); + bank.process_genesis_block(genesis_block); + bank.ancestors.insert(0, 0); + bank + } + pub fn slot(&self) -> u64 { self.slot } @@ -2582,10 +2580,14 @@ mod tests { serialize_into(&mut writer, &bank).unwrap(); serialize_into(&mut writer, &bank.rc).unwrap(); - let mut reader = Cursor::new(&mut buf[..]); - let mut dbank: Bank = deserialize_from(&mut reader).unwrap(); - let dbank_rc: BankRc = deserialize_from(&mut reader).unwrap(); - dbank.rc = dbank_rc; + let mut rdr = Cursor::new(&buf[..]); + let mut dbank: Bank = deserialize_from(&mut rdr).unwrap(); + let mut reader = BufReader::new(&buf[rdr.position() as usize..]); + dbank.set_bank_rc( + &BankRc::new(Some(bank0.accounts().paths.clone())), + &StatusCacheRc::default(), + ); + assert!(dbank.rc.update_from_stream(&mut reader).is_ok()); assert_eq!(dbank.get_balance(&key.pubkey()), 10); bank.compare_bank(&dbank); }