diff --git a/core/src/bank_forks.rs b/core/src/bank_forks.rs index 421872e950..690c210933 100644 --- a/core/src/bank_forks.rs +++ b/core/src/bank_forks.rs @@ -1,10 +1,15 @@ //! The `bank_forks` module implments BankForks a DAG of checkpointed Banks -use hashbrown::{HashMap, HashSet}; +use bincode::{deserialize_from, serialize_into}; use solana_metrics::inc_new_counter_info; -use solana_runtime::bank::Bank; +use solana_runtime::bank::{Bank, BankRc, StatusCacheRc}; use solana_sdk::timing; +use std::collections::{HashMap, HashSet}; +use std::fs; +use std::fs::File; +use std::io::{BufReader, BufWriter, Error, ErrorKind}; use std::ops::Index; +use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Instant; @@ -12,6 +17,8 @@ pub struct BankForks { banks: HashMap>, working_bank: Arc, root: u64, + slots: HashSet, + snapshot_path: Option, } impl Index for BankForks { @@ -30,6 +37,8 @@ impl BankForks { banks, working_bank, root: 0, + slots: HashSet::new(), + snapshot_path: None, } } @@ -45,6 +54,7 @@ impl BankForks { } /// Create a map of bank slot id to the set of all of its descendants + #[allow(clippy::or_fun_call)] pub fn descendants(&self) -> HashMap> { let mut descendants = HashMap::new(); for bank in self.banks.values() { @@ -91,6 +101,8 @@ impl BankForks { root, banks, working_bank, + slots: HashSet::new(), + snapshot_path: None, } } @@ -138,9 +150,199 @@ impl BankForks { } fn prune_non_root(&mut self, root: u64) { + let slots: HashSet = self + .banks + .iter() + .filter(|(_, b)| b.is_frozen()) + .map(|(k, _)| *k) + .collect(); let descendants = self.descendants(); self.banks - .retain(|slot, _| descendants[&root].contains(slot)) + .retain(|slot, _| descendants[&root].contains(slot)); + if self.snapshot_path.is_some() { + let diff: HashSet<_> = slots.symmetric_difference(&self.slots).collect(); + trace!("prune non root {} - {:?}", root, diff); + for slot in diff.iter() { + if **slot > root { + let _ = self.add_snapshot(**slot, root); + } else if **slot > 0 { + BankForks::remove_snapshot(**slot, &self.snapshot_path); + } + } + } + self.slots = slots.clone(); + } + + fn get_io_error(error: &str) -> Error { + warn!("BankForks error: {:?}", error); + Error::new(ErrorKind::Other, error) + } + + fn get_snapshot_path(path: &Option) -> PathBuf { + Path::new(&path.clone().unwrap()).to_path_buf() + } + + pub fn add_snapshot(&self, slot: u64, root: u64) -> Result<(), Error> { + let path = BankForks::get_snapshot_path(&self.snapshot_path); + fs::create_dir_all(path.clone())?; + let bank_file = format!("{}", slot); + let bank_file_path = path.join(bank_file); + trace!("path: {:?}", bank_file_path); + let file = File::create(bank_file_path)?; + let mut stream = BufWriter::new(file); + let bank_slot = self.get(slot); + if bank_slot.is_none() { + return Err(BankForks::get_io_error("bank_forks get error")); + } + let bank = bank_slot.unwrap().clone(); + serialize_into(&mut stream, &*bank) + .map_err(|_| BankForks::get_io_error("serialize bank error"))?; + let mut parent_slot: u64 = 0; + if let Some(parent_bank) = bank.parent() { + parent_slot = parent_bank.slot(); + } + serialize_into(&mut stream, &parent_slot) + .map_err(|_| BankForks::get_io_error("serialize bank parent error"))?; + serialize_into(&mut stream, &root) + .map_err(|_| BankForks::get_io_error("serialize root error"))?; + serialize_into(&mut stream, &bank.src) + .map_err(|_| BankForks::get_io_error("serialize bank status cache error"))?; + serialize_into(&mut stream, &bank.rc) + .map_err(|_| BankForks::get_io_error("serialize bank accounts error"))?; + Ok(()) + } + + pub fn remove_snapshot(slot: u64, path: &Option) { + let path = BankForks::get_snapshot_path(path); + let bank_file = format!("{}", slot); + let bank_file_path = path.join(bank_file); + let _ = fs::remove_file(bank_file_path); + } + + pub fn set_snapshot_config(&mut self, path: Option) { + self.snapshot_path = path; + } + + fn load_snapshots( + names: &[u64], + bank_maps: &mut Vec<(u64, u64, Bank)>, + status_cache_rc: &StatusCacheRc, + snapshot_path: &Option, + ) -> Option<(BankRc, u64)> { + let path = BankForks::get_snapshot_path(snapshot_path); + let mut bank_rc: Option<(BankRc, u64)> = None; + + for bank_slot in names.iter().rev() { + let bank_path = format!("{}", bank_slot); + let bank_file_path = path.join(bank_path.clone()); + info!("Load from {:?}", bank_file_path); + let file = File::open(bank_file_path); + if file.is_err() { + warn!("Snapshot file open failed for {}", bank_slot); + continue; + } + let file = file.unwrap(); + let mut stream = BufReader::new(file); + let bank: Result = deserialize_from(&mut stream) + .map_err(|_| BankForks::get_io_error("deserialize bank error")); + let slot: Result = deserialize_from(&mut stream) + .map_err(|_| BankForks::get_io_error("deserialize bank parent error")); + let parent_slot = if slot.is_ok() { slot.unwrap() } else { 0 }; + let root: Result = deserialize_from(&mut stream) + .map_err(|_| BankForks::get_io_error("deserialize root error")); + let status_cache: Result = deserialize_from(&mut stream) + .map_err(|_| BankForks::get_io_error("deserialize bank status cache error")); + if bank_rc.is_none() { + let rc: Result = deserialize_from(&mut stream) + .map_err(|_| BankForks::get_io_error("deserialize bank accounts error")); + if rc.is_ok() { + bank_rc = Some((rc.unwrap(), root.unwrap())); + } + } + if bank_rc.is_some() { + match bank { + Ok(v) => { + if status_cache.is_ok() { + status_cache_rc.append(&status_cache.unwrap()); + } + bank_maps.push((*bank_slot, parent_slot, v)); + } + Err(_) => warn!("Load snapshot failed for {}", bank_slot), + } + } else { + BankForks::remove_snapshot(*bank_slot, snapshot_path); + warn!("Load snapshot rc failed for {}", bank_slot); + } + } + bank_rc + } + + fn setup_banks( + bank_maps: &mut Vec<(u64, u64, Bank)>, + bank_rc: &BankRc, + status_cache_rc: &StatusCacheRc, + ) -> (HashMap>, HashSet, u64) { + let mut banks = HashMap::new(); + let mut slots = HashSet::new(); + let (last_slot, last_parent_slot, mut last_bank) = bank_maps.remove(0); + last_bank.set_bank_rc(&bank_rc, &status_cache_rc); + + while let Some((slot, parent_slot, mut bank)) = bank_maps.pop() { + bank.set_bank_rc(&bank_rc, &status_cache_rc); + if parent_slot != 0 { + if let Some(parent) = banks.get(&parent_slot) { + bank.set_parent(parent); + } + } + if slot > 0 { + banks.insert(slot, Arc::new(bank)); + slots.insert(slot); + } + } + if last_parent_slot != 0 { + if let Some(parent) = banks.get(&last_parent_slot) { + last_bank.set_parent(parent); + } + } + banks.insert(last_slot, Arc::new(last_bank)); + slots.insert(last_slot); + + (banks, slots, last_slot) + } + + pub fn load_from_snapshot(snapshot_path: &Option) -> Result { + let path = BankForks::get_snapshot_path(snapshot_path); + let paths = fs::read_dir(path)?; + let mut names = paths + .filter_map(|entry| { + entry.ok().and_then(|e| { + e.path() + .file_name() + .and_then(|n| n.to_str().map(|s| s.parse::().unwrap())) + }) + }) + .collect::>(); + + names.sort(); + let mut bank_maps = vec![]; + let status_cache_rc = StatusCacheRc::default(); + let rc = BankForks::load_snapshots(&names, &mut bank_maps, &status_cache_rc, snapshot_path); + if bank_maps.is_empty() || rc.is_none() { + BankForks::remove_snapshot(0, snapshot_path); + return Err(Error::new(ErrorKind::Other, "no snapshots loaded")); + } + + let (bank_rc, root) = rc.unwrap(); + let (banks, slots, last_slot) = + BankForks::setup_banks(&mut bank_maps, &bank_rc, &status_cache_rc); + let working_bank = banks[&last_slot].clone(); + Ok(BankForks { + banks, + working_bank, + root, + slots, + snapshot_path: snapshot_path.clone(), + }) } } @@ -150,6 +352,10 @@ mod tests { use crate::genesis_utils::{create_genesis_block, GenesisBlockInfo}; use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; + use solana_sdk::signature::{Keypair, KeypairUtil}; + use solana_sdk::system_transaction; + use std::env; + use std::fs::remove_dir_all; #[test] fn test_bank_forks() { @@ -174,8 +380,8 @@ mod tests { let bank = Bank::new_from_parent(&bank0, &Pubkey::default(), 2); bank_forks.insert(bank); let descendants = bank_forks.descendants(); - let children: Vec = descendants[&0].iter().cloned().collect(); - assert_eq!(children, vec![1, 2]); + let children: HashSet = [1u64, 2u64].to_vec().into_iter().collect(); + assert_eq!(children, *descendants.get(&0).unwrap()); assert!(descendants[&1].is_empty()); assert!(descendants[&2].is_empty()); } @@ -219,4 +425,103 @@ mod tests { assert_eq!(bank_forks.active_banks(), vec![1]); } + struct TempPaths { + pub paths: String, + } + + #[macro_export] + macro_rules! tmp_bank_accounts_name { + () => { + &format!("{}-{}", file!(), line!()) + }; + } + + #[macro_export] + macro_rules! get_tmp_bank_accounts_path { + () => { + get_tmp_bank_accounts_path(tmp_bank_accounts_name!()) + }; + } + + impl Drop for TempPaths { + fn drop(&mut self) { + let paths: Vec = self.paths.split(',').map(|s| s.to_string()).collect(); + paths.iter().for_each(|p| { + let _ignored = remove_dir_all(p); + }); + } + } + + fn get_paths_vec(paths: &str) -> Vec { + paths.split(',').map(|s| s.to_string()).collect() + } + + fn get_tmp_snapshots_path() -> TempPaths { + let out_dir = env::var("OUT_DIR").unwrap_or_else(|_| "target".to_string()); + let path = format!("{}/snapshots", out_dir); + TempPaths { + paths: path.to_string(), + } + } + + fn get_tmp_bank_accounts_path(paths: &str) -> TempPaths { + let vpaths = get_paths_vec(paths); + let out_dir = env::var("OUT_DIR").unwrap_or_else(|_| "target".to_string()); + let vpaths: Vec<_> = vpaths + .iter() + .map(|path| format!("{}/{}", out_dir, path)) + .collect(); + TempPaths { + paths: vpaths.join(","), + } + } + + fn restore_from_snapshot(bank_forks: BankForks, last_slot: u64) { + let new = BankForks::load_from_snapshot(&bank_forks.snapshot_path).unwrap(); + for (slot, _) in new.banks.iter() { + let bank = bank_forks.banks.get(slot).unwrap().clone(); + let new_bank = new.banks.get(slot).unwrap(); + bank.compare_bank(&new_bank); + } + assert_eq!(new.working_bank().slot(), last_slot); + for (slot, _) in new.banks.iter() { + BankForks::remove_snapshot(*slot, &bank_forks.snapshot_path); + } + } + + #[test] + fn test_bank_forks_snapshot_n() { + solana_logger::setup(); + let path = get_tmp_bank_accounts_path!(); + let spath = get_tmp_snapshots_path(); + let GenesisBlockInfo { + genesis_block, + mint_keypair, + .. + } = create_genesis_block(10_000); + for index in 0..10 { + let bank0 = Bank::new_with_paths(&genesis_block, Some(path.paths.clone())); + bank0.freeze(); + let slot = bank0.slot(); + let mut bank_forks = BankForks::new(0, bank0); + bank_forks.set_snapshot_config(Some(spath.paths.clone())); + bank_forks.add_snapshot(slot, 0).unwrap(); + for forks in 0..index { + let bank = Bank::new_from_parent(&bank_forks[forks], &Pubkey::default(), forks + 1); + let key1 = Keypair::new().pubkey(); + let tx = system_transaction::create_user_account( + &mint_keypair, + &key1, + 1, + genesis_block.hash(), + ); + assert_eq!(bank.process_transaction(&tx), Ok(())); + bank.freeze(); + let slot = bank.slot(); + bank_forks.insert(bank); + bank_forks.add_snapshot(slot, 0).unwrap(); + } + restore_from_snapshot(bank_forks, index); + } + } } diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index 373807ecdc..318fd8148c 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -11,7 +11,7 @@ use solana_kvstore as kvstore; use bincode::deserialize; -use hashbrown::HashMap; +use std::collections::HashMap; #[cfg(not(feature = "kvstore"))] use rocksdb; diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index e102ceb79d..a24b1f3712 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -26,7 +26,6 @@ use crate::staking_utils; use crate::streamer::{BlobReceiver, BlobSender}; use bincode::{deserialize, serialize}; use core::cmp; -use hashbrown::HashMap; use rand::{thread_rng, Rng}; use rayon::prelude::*; use solana_metrics::{datapoint_debug, inc_new_counter_debug, inc_new_counter_error}; @@ -40,7 +39,7 @@ use solana_sdk::signature::{Keypair, KeypairUtil, Signable, Signature}; use solana_sdk::timing::{duration_as_ms, timestamp}; use solana_sdk::transaction::Transaction; use std::cmp::min; -use std::collections::BTreeSet; +use std::collections::{BTreeSet, HashMap}; use std::fmt; use std::io; use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; diff --git a/core/src/crds_gossip.rs b/core/src/crds_gossip.rs index 7ced08637b..c860b905ca 100644 --- a/core/src/crds_gossip.rs +++ b/core/src/crds_gossip.rs @@ -8,10 +8,10 @@ use crate::crds_gossip_error::CrdsGossipError; use crate::crds_gossip_pull::CrdsGossipPull; use crate::crds_gossip_push::{CrdsGossipPush, CRDS_GOSSIP_NUM_ACTIVE}; use crate::crds_value::CrdsValue; -use hashbrown::HashMap; use solana_runtime::bloom::Bloom; use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; +use std::collections::HashMap; ///The min size for bloom filters pub const CRDS_GOSSIP_BLOOM_SIZE: usize = 1000; diff --git a/core/src/crds_gossip_pull.rs b/core/src/crds_gossip_pull.rs index 596c6d3002..94d4b6203a 100644 --- a/core/src/crds_gossip_pull.rs +++ b/core/src/crds_gossip_pull.rs @@ -16,13 +16,13 @@ use crate::crds_gossip_error::CrdsGossipError; use crate::crds_value::{CrdsValue, CrdsValueLabel}; use crate::packet::BLOB_DATA_SIZE; use bincode::serialized_size; -use hashbrown::HashMap; use rand; use rand::distributions::{Distribution, WeightedIndex}; use solana_runtime::bloom::Bloom; use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; use std::cmp; +use std::collections::HashMap; use std::collections::VecDeque; pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000; diff --git a/core/src/crds_gossip_push.rs b/core/src/crds_gossip_push.rs index 24459d5137..bcc00ca7a2 100644 --- a/core/src/crds_gossip_push.rs +++ b/core/src/crds_gossip_push.rs @@ -15,7 +15,6 @@ use crate::crds_gossip_error::CrdsGossipError; use crate::crds_value::{CrdsValue, CrdsValueLabel}; use crate::packet::BLOB_DATA_SIZE; use bincode::serialized_size; -use hashbrown::HashMap; use indexmap::map::IndexMap; use rand; use rand::distributions::{Distribution, WeightedIndex}; @@ -25,6 +24,7 @@ use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::timestamp; use std::cmp; +use std::collections::HashMap; pub const CRDS_GOSSIP_NUM_ACTIVE: usize = 30; pub const CRDS_GOSSIP_PUSH_FANOUT: usize = 6; diff --git a/core/src/locktower.rs b/core/src/locktower.rs index 6d29fde3fb..24ea4131b6 100644 --- a/core/src/locktower.rs +++ b/core/src/locktower.rs @@ -1,13 +1,12 @@ use crate::bank_forks::BankForks; use crate::staking_utils; -use hashbrown::{HashMap, HashSet}; use solana_metrics::datapoint_info; use solana_runtime::bank::Bank; use solana_sdk::account::Account; use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; use solana_vote_api::vote_state::{Lockout, Vote, VoteState, MAX_LOCKOUT_HISTORY}; -use std::collections::VecDeque; +use std::collections::{HashMap, HashSet, VecDeque}; use std::sync::Arc; pub const VOTE_THRESHOLD_DEPTH: usize = 8; diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index adefbc348c..ff33042b9c 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -13,7 +13,6 @@ use crate::poh_recorder::PohRecorder; use crate::result::{Error, Result}; use crate::rpc_subscriptions::RpcSubscriptions; use crate::service::Service; -use hashbrown::HashMap; use solana_metrics::{datapoint_warn, inc_new_counter_error, inc_new_counter_info}; use solana_runtime::bank::Bank; use solana_sdk::hash::Hash; @@ -22,6 +21,7 @@ use solana_sdk::signature::KeypairUtil; use solana_sdk::timing::{self, duration_as_ms}; use solana_sdk::transaction::Transaction; use solana_vote_api::vote_instruction; +use std::collections::HashMap; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender}; use std::sync::{Arc, Mutex, RwLock}; @@ -510,7 +510,7 @@ impl ReplayStage { let bank_slot = bank.slot(); let bank_progress = &mut progress .entry(bank_slot) - .or_insert(ForkProgress::new(bank.last_blockhash())); + .or_insert_with(|| ForkProgress::new(bank.last_blockhash())); blocktree.get_slot_entries_with_blob_count(bank_slot, bank_progress.num_blobs as u64, None) } @@ -522,7 +522,7 @@ impl ReplayStage { ) -> Result<()> { let bank_progress = &mut progress .entry(bank.slot()) - .or_insert(ForkProgress::new(bank.last_blockhash())); + .or_insert_with(|| ForkProgress::new(bank.last_blockhash())); let result = Self::verify_and_process_entries(&bank, &entries, &bank_progress.last_entry); bank_progress.num_blobs += num; if let Some(last_entry) = entries.last() { diff --git a/core/src/staking_utils.rs b/core/src/staking_utils.rs index cc5e0decf8..e94bb5b043 100644 --- a/core/src/staking_utils.rs +++ b/core/src/staking_utils.rs @@ -1,9 +1,9 @@ -use hashbrown::HashMap; use solana_runtime::bank::Bank; use solana_sdk::account::Account; use solana_sdk::pubkey::Pubkey; use solana_vote_api::vote_state::VoteState; use std::borrow::Borrow; +use std::collections::HashMap; /// Looks through vote accounts, and finds the latest slot that has achieved /// supermajority lockout @@ -47,7 +47,7 @@ pub fn vote_account_stakes_at_epoch( /// that have non-zero balance in any of their managed staking accounts pub fn staked_nodes_at_epoch(bank: &Bank, epoch_height: u64) -> Option> { bank.epoch_vote_accounts(epoch_height) - .map(|vote_accounts| to_staked_nodes(to_vote_states(vote_accounts.into_iter()))) + .map(|vote_accounts| to_staked_nodes(to_vote_states(vote_accounts.iter()))) } // input (vote_pubkey, (stake, vote_account)) => (stake, vote_state) @@ -78,7 +78,7 @@ fn epoch_stakes_and_lockouts(bank: &Bank, epoch_height: u64) -> Vec<(u64, Option let node_staked_accounts = bank .epoch_vote_accounts(epoch_height) .expect("Bank state for epoch is missing") - .into_iter(); + .iter(); to_vote_states(node_staked_accounts) .map(|(stake, states)| (stake, states.root_slot)) @@ -116,13 +116,13 @@ pub(crate) mod tests { create_genesis_block, create_genesis_block_with_leader, GenesisBlockInfo, BOOTSTRAP_LEADER_LAMPORTS, }; - use hashbrown::HashSet; use solana_sdk::instruction::Instruction; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::transaction::Transaction; use solana_stake_api::stake_instruction; use solana_vote_api::vote_instruction; + use std::collections::HashSet; use std::iter::FromIterator; use std::sync::Arc; diff --git a/core/src/validator.rs b/core/src/validator.rs index d5b42d92e4..b66ac4a0a3 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -38,6 +38,7 @@ pub struct ValidatorConfig { pub storage_rotate_count: u64, pub account_paths: Option, pub rpc_config: JsonRpcConfig, + pub snapshot_path: Option, } impl Default for ValidatorConfig { fn default() -> Self { @@ -52,6 +53,7 @@ impl Default for ValidatorConfig { storage_rotate_count: NUM_HASHES_FOR_STORAGE_ROTATE, account_paths: None, rpc_config: JsonRpcConfig::default(), + snapshot_path: None, } } } @@ -93,7 +95,11 @@ impl Validator { completed_slots_receiver, leader_schedule_cache, poh_config, - ) = new_banks_from_blocktree(ledger_path, config.account_paths.clone()); + ) = new_banks_from_blocktree( + ledger_path, + config.account_paths.clone(), + config.snapshot_path.clone(), + ); let leader_schedule_cache = Arc::new(leader_schedule_cache); let exit = Arc::new(AtomicBool::new(false)); @@ -108,7 +114,7 @@ impl Validator { let blocktree = Arc::new(blocktree); let poh_config = Arc::new(poh_config); - let (poh_recorder, entry_receiver) = PohRecorder::new_with_clear_signal( + let (mut poh_recorder, entry_receiver) = PohRecorder::new_with_clear_signal( bank.tick_height(), bank.last_blockhash(), bank.slot(), @@ -120,6 +126,10 @@ impl Validator { &leader_schedule_cache, &poh_config, ); + if config.snapshot_path.is_some() { + poh_recorder.set_bank(&bank); + } + let poh_recorder = Arc::new(Mutex::new(poh_recorder)); let poh_service = PohService::new(poh_recorder.clone(), &poh_config, &exit); assert_eq!( @@ -280,9 +290,40 @@ impl Validator { } } +fn get_bank_forks( + genesis_block: &GenesisBlock, + blocktree: &Blocktree, + account_paths: Option, + snapshot_path: Option, +) -> (BankForks, Vec, LeaderScheduleCache) { + if snapshot_path.is_some() { + let bank_forks = BankForks::load_from_snapshot(&snapshot_path); + match bank_forks { + Ok(v) => { + let bank = &v.working_bank(); + let fork_info = BankForksInfo { + bank_slot: bank.slot(), + entry_height: bank.tick_height(), + }; + return (v, vec![fork_info], LeaderScheduleCache::new_from_bank(bank)); + } + Err(_) => warn!("Failed to load from snapshot, fallback to load from ledger"), + } + } + let (mut bank_forks, bank_forks_info, leader_schedule_cache) = + blocktree_processor::process_blocktree(&genesis_block, &blocktree, account_paths) + .expect("process_blocktree failed"); + if snapshot_path.is_some() { + bank_forks.set_snapshot_config(snapshot_path); + let _ = bank_forks.add_snapshot(0, 0); + } + (bank_forks, bank_forks_info, leader_schedule_cache) +} + pub fn new_banks_from_blocktree( blocktree_path: &str, account_paths: Option, + snapshot_path: Option, ) -> ( BankForks, Vec, @@ -300,8 +341,7 @@ pub fn new_banks_from_blocktree( .expect("Expected to successfully open database ledger"); let (bank_forks, bank_forks_info, leader_schedule_cache) = - blocktree_processor::process_blocktree(&genesis_block, &blocktree, account_paths) - .expect("process_blocktree failed"); + get_bank_forks(&genesis_block, &blocktree, account_paths, snapshot_path); ( bank_forks, diff --git a/core/tests/cluster_info.rs b/core/tests/cluster_info.rs index f26f975cbf..c9d8e06ae6 100644 --- a/core/tests/cluster_info.rs +++ b/core/tests/cluster_info.rs @@ -1,9 +1,9 @@ -use hashbrown::{HashMap, HashSet}; use rayon::iter::{IntoParallelIterator, ParallelIterator}; use rayon::prelude::*; use solana::cluster_info::{compute_retransmit_peers, ClusterInfo}; use solana::contact_info::ContactInfo; use solana_sdk::pubkey::Pubkey; +use std::collections::{HashMap, HashSet}; use std::sync::mpsc::channel; use std::sync::mpsc::TryRecvError; use std::sync::mpsc::{Receiver, Sender}; diff --git a/core/tests/crds_gossip.rs b/core/tests/crds_gossip.rs index f0f085db8d..9d7981bfff 100644 --- a/core/tests/crds_gossip.rs +++ b/core/tests/crds_gossip.rs @@ -1,5 +1,4 @@ use bincode::serialized_size; -use hashbrown::HashMap; use log::*; use rayon::prelude::*; use solana::contact_info::ContactInfo; @@ -11,6 +10,7 @@ use solana::crds_value::CrdsValueLabel; use solana_sdk::hash::hash; use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::timestamp; +use std::collections::HashMap; use std::sync::{Arc, Mutex}; type Node = Arc>; diff --git a/core/tests/tvu.rs b/core/tests/tvu.rs index 4a847cc8b2..1cbca862c6 100644 --- a/core/tests/tvu.rs +++ b/core/tests/tvu.rs @@ -94,7 +94,7 @@ fn test_replay() { completed_slots_receiver, leader_schedule_cache, _, - ) = validator::new_banks_from_blocktree(&blocktree_path, None); + ) = validator::new_banks_from_blocktree(&blocktree_path, None, None); let working_bank = bank_forks.working_bank(); assert_eq!( working_bank.get_balance(&mint_keypair.pubkey()), diff --git a/run.sh b/run.sh index d1918a6bed..cf43667579 100755 --- a/run.sh +++ b/run.sh @@ -40,9 +40,24 @@ export RUST_BACKTRACE=1 dataDir=$PWD/target/"$(basename "$0" .sh)" set -x -solana-keygen -o "$dataDir"/config/leader-keypair.json -solana-keygen -o "$dataDir"/config/leader-vote-account-keypair.json -solana-keygen -o "$dataDir"/config/leader-stake-account-keypair.json +leader_keypair="$dataDir/config/leader-keypair.json" +if [[ -e $leader_keypair ]]; then + echo "Use existing leader keypair" +else + solana-keygen -o "$leader_keypair" +fi +leader_vote_account_keypair="$dataDir/config/leader-vote-account-keypair.json" +if [[ -e $leader_vote_account_keypair ]]; then + echo "Use existing leader vote account keypair" +else + solana-keygen -o "$leader_vote_account_keypair" +fi +leader_stake_account_keypair="$dataDir/config/leader-stake-account-keypair.json" +if [[ -e $leader_stake_account_keypair ]]; then + echo "Use existing leader stake account keypair" +else + solana-keygen -o "$leader_stake_account_keypair" +fi solana-keygen -o "$dataDir"/config/drone-keypair.json solana-keygen -o "$dataDir"/config/leader-storage-account-keypair.json @@ -76,6 +91,8 @@ args=( --gossip-port 8001 --rpc-port 8899 --rpc-drone-address 127.0.0.1:9900 + --accounts "$dataDir"/accounts + --snapshot-path "$dataDir"/snapshots ) if [[ -n $blockstreamSocket ]]; then args+=(--blockstream "$blockstreamSocket") diff --git a/runtime/benches/status_cache.rs b/runtime/benches/status_cache.rs new file mode 100644 index 0000000000..29310098b7 --- /dev/null +++ b/runtime/benches/status_cache.rs @@ -0,0 +1,33 @@ +#![feature(test)] + +extern crate test; + +use bincode::serialize; +use solana_runtime::status_cache::*; +use solana_sdk::hash::{hash, Hash}; +use solana_sdk::signature::Signature; +use test::Bencher; + +type BankStatusCache = StatusCache<()>; + +#[bench] +fn test_statuscache_serialize(bencher: &mut Bencher) { + let mut status_cache = BankStatusCache::default(); + status_cache.add_root(0); + status_cache.clear_signatures(); + for hash_index in 0..100 { + let blockhash = Hash::new(&vec![hash_index; std::mem::size_of::()]); + let mut id = blockhash; + for _ in 0..100 { + id = hash(id.as_ref()); + let mut sigbytes = Vec::from(id.as_ref()); + id = hash(id.as_ref()); + sigbytes.extend(id.as_ref()); + let sig = Signature::new(&sigbytes); + status_cache.insert(&blockhash, &sig, 0, ()); + } + } + bencher.iter(|| { + let _ = serialize(&status_cache).unwrap(); + }); +} diff --git a/runtime/src/accounts.rs b/runtime/src/accounts.rs index d7e2123809..9d6d0d873f 100644 --- a/runtime/src/accounts.rs +++ b/runtime/src/accounts.rs @@ -1,13 +1,13 @@ use crate::accounts_db::{ - get_paths_vec, AccountInfo, AccountStorage, AccountsDB, ErrorCounters, InstructionAccounts, - InstructionLoaders, + get_paths_vec, AccountInfo, AccountStorage, AccountsDB, AppendVecId, ErrorCounters, + InstructionAccounts, InstructionLoaders, }; use crate::accounts_index::{AccountsIndex, Fork}; use crate::append_vec::StoredAccount; use crate::message_processor::has_duplicates; use bincode::serialize; -use hashbrown::{HashMap, HashSet}; use log::*; +use serde::{Deserialize, Serialize}; use solana_metrics::inc_new_counter_error; use solana_sdk::account::Account; use solana_sdk::fee_calculator::FeeCalculator; @@ -19,6 +19,7 @@ use solana_sdk::system_program; use solana_sdk::transaction::Result; use solana_sdk::transaction::{Transaction, TransactionError}; use std::borrow::Borrow; +use std::collections::{HashMap, HashSet}; use std::env; use std::fs::remove_dir_all; use std::iter::once; @@ -51,15 +52,18 @@ type RecordLocks = ( ); /// This structure handles synchronization for db -#[derive(Default)] +#[derive(Default, Debug, Serialize, Deserialize)] pub struct Accounts { /// Single global AccountsDB + #[serde(skip)] pub accounts_db: Arc, /// set of accounts which are currently in the pipeline + #[serde(skip)] account_locks: AccountLocks, /// set of accounts which are about to record + commit + #[serde(skip)] record_locks: Mutex, /// List of persistent stores @@ -72,16 +76,16 @@ pub struct Accounts { impl Drop for Accounts { fn drop(&mut self) { - let paths = get_paths_vec(&self.paths); - paths.iter().for_each(|p| { - let _ignored = remove_dir_all(p); + if self.own_paths { + let paths = get_paths_vec(&self.paths); + paths.iter().for_each(|p| { + let _ignored = remove_dir_all(p); - // it is safe to delete the parent - if self.own_paths { + // it is safe to delete the parent let path = Path::new(p); let _ignored = remove_dir_all(path.parent().unwrap()); - } - }); + }); + } } } @@ -330,7 +334,9 @@ impl Accounts { pub fn load_by_program(&self, fork: Fork, program_id: &Pubkey) -> Vec<(Pubkey, Account)> { let accumulator: Vec> = self.accounts_db.scan_account_storage( fork, - |stored_account: &StoredAccount, accum: &mut Vec<(Pubkey, u64, Account)>| { + |stored_account: &StoredAccount, + _id: AppendVecId, + accum: &mut Vec<(Pubkey, u64, Account)>| { if stored_account.balance.owner == *program_id { let val = ( stored_account.meta.pubkey, @@ -441,7 +447,9 @@ impl Accounts { pub fn hash_internal_state(&self, fork_id: Fork) -> Option { let accumulator: Vec> = self.accounts_db.scan_account_storage( fork_id, - |stored_account: &StoredAccount, accum: &mut Vec<(Pubkey, u64, Hash)>| { + |stored_account: &StoredAccount, + _id: AppendVecId, + accum: &mut Vec<(Pubkey, u64, Hash)>| { accum.push(( stored_account.meta.pubkey, stored_account.meta.write_version, @@ -586,11 +594,14 @@ mod tests { // TODO: all the bank tests are bank specific, issue: 2194 use super::*; + use bincode::{deserialize_from, serialize_into, serialized_size}; + use rand::{thread_rng, Rng}; use solana_sdk::account::Account; use solana_sdk::hash::Hash; use solana_sdk::instruction::CompiledInstruction; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::transaction::Transaction; + use std::io::Cursor; use std::thread::{sleep, Builder}; use std::time::Duration; @@ -1156,4 +1167,51 @@ mod tests { assert!(parent_record_locks2.is_empty()); } } + + fn create_accounts(accounts: &Accounts, pubkeys: &mut Vec, num: usize) { + for t in 0..num { + let pubkey = Pubkey::new_rand(); + let account = Account::new((t + 1) as u64, 0, &Account::default().owner); + accounts.store_slow(0, &pubkey, &account); + pubkeys.push(pubkey.clone()); + } + } + + fn check_accounts(accounts: &Accounts, pubkeys: &Vec, num: usize) { + for _ in 1..num { + let idx = thread_rng().gen_range(0, num - 1); + let ancestors = vec![(0, 0)].into_iter().collect(); + let account = accounts.load_slow(&ancestors, &pubkeys[idx]).unwrap(); + let account1 = Account::new((idx + 1) as u64, 0, &Account::default().owner); + assert_eq!(account, (account1, 0)); + } + } + + #[test] + fn test_accounts_serialize() { + solana_logger::setup(); + let accounts = Accounts::new(Some("serialize_accounts".to_string())); + + let mut pubkeys: Vec = vec![]; + create_accounts(&accounts, &mut pubkeys, 100); + check_accounts(&accounts, &pubkeys, 100); + accounts.add_root(0); + + let sz = + serialized_size(&accounts).unwrap() + serialized_size(&*accounts.accounts_db).unwrap(); + let mut buf = vec![0u8; sz as usize]; + let mut writer = Cursor::new(&mut buf[..]); + serialize_into(&mut writer, &accounts).unwrap(); + serialize_into(&mut writer, &*accounts.accounts_db).unwrap(); + + let mut reader = Cursor::new(&mut buf[..]); + let mut daccounts: Accounts = deserialize_from(&mut reader).unwrap(); + let accounts_db: AccountsDB = deserialize_from(&mut reader).unwrap(); + daccounts.accounts_db = Arc::new(accounts_db); + check_accounts(&daccounts, &pubkeys, 100); + assert_eq!( + accounts.hash_internal_state(0), + daccounts.hash_internal_state(0) + ); + } } diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 6e35bd3d88..7484b1dc70 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -20,14 +20,20 @@ use crate::accounts_index::{AccountsIndex, Fork}; use crate::append_vec::{AppendVec, StorageMeta, StoredAccount}; -use hashbrown::{HashMap, HashSet}; +use bincode::{deserialize_from, serialize_into, serialized_size}; use log::*; use rand::{thread_rng, Rng}; use rayon::prelude::*; use rayon::ThreadPool; +use serde::de::{MapAccess, Visitor}; +use serde::ser::{SerializeMap, Serializer}; +use serde::{Deserialize, Serialize}; use solana_sdk::account::Account; use solana_sdk::pubkey::Pubkey; +use std::collections::{HashMap, HashSet}; +use std::fmt; use std::fs::{create_dir_all, remove_dir_all}; +use std::io::Cursor; use std::path::Path; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; @@ -53,7 +59,7 @@ pub struct ErrorCounters { pub missing_signature_for_fee: usize, } -#[derive(Default, Clone)] +#[derive(Deserialize, Serialize, Default, Debug, PartialEq, Clone)] pub struct AccountInfo { /// index identifying the append storage id: AppendVecId, @@ -66,18 +72,67 @@ pub struct AccountInfo { lamports: u64, } /// An offset into the AccountsDB::storage vector -type AppendVecId = usize; -pub type AccountStorage = HashMap>; +pub type AppendVecId = usize; pub type InstructionAccounts = Vec; pub type InstructionLoaders = Vec>; -#[derive(Debug, PartialEq, Clone, Copy)] +#[derive(Default, Debug)] +pub struct AccountStorage(HashMap>); + +struct AccountStorageVisitor; + +impl<'de> Visitor<'de> for AccountStorageVisitor { + type Value = AccountStorage; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("Expecting AccountStorage") + } + + #[allow(clippy::mutex_atomic)] + fn visit_map(self, mut access: M) -> Result + where + M: MapAccess<'de>, + { + let mut map = HashMap::new(); + + while let Some((key, value)) = access.next_entry()? { + map.insert(key, Arc::new(value)); + } + + Ok(AccountStorage(map)) + } +} + +impl Serialize for AccountStorage { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let mut map = serializer.serialize_map(Some(self.0.len()))?; + for (k, v) in &self.0 { + map.serialize_entry(k, &**v)?; + } + map.end() + } +} + +impl<'de> Deserialize<'de> for AccountStorage { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + deserializer.deserialize_map(AccountStorageVisitor) + } +} + +#[derive(Debug, PartialEq, Copy, Clone, Deserialize, Serialize)] pub enum AccountStorageStatus { StorageAvailable = 0, StorageFull = 1, } /// Persistent storage structure holding the accounts +#[derive(Debug, Deserialize, Serialize)] pub struct AccountStorageEntry { id: AppendVecId, @@ -96,7 +151,7 @@ pub struct AccountStorageEntry { impl AccountStorageEntry { pub fn new(path: &str, fork_id: Fork, id: usize, file_size: u64) -> Self { - let p = format!("{}/{}", path, id); + let p = format!("{}/{}.{}", path, fork_id, id); let path = Path::new(&p); let _ignored = remove_dir_all(path); create_dir_all(path).expect("Create directory failed"); @@ -169,6 +224,7 @@ impl AccountStorageEntry { } // This structure handles the load/store of the accounts +#[derive(Debug)] pub struct AccountsDB { /// Keeps tracks of index into AppendVec on a per fork basis pub accounts_index: RwLock>, @@ -200,7 +256,7 @@ impl Default for AccountsDB { fn default() -> Self { AccountsDB { accounts_index: RwLock::new(AccountsIndex::default()), - storage: RwLock::new(HashMap::new()), + storage: RwLock::new(AccountStorage(HashMap::new())), next_id: AtomicUsize::new(0), write_version: AtomicUsize::new(0), paths: Vec::default(), @@ -218,7 +274,7 @@ impl AccountsDB { let paths = get_paths_vec(&paths); AccountsDB { accounts_index: RwLock::new(AccountsIndex::default()), - storage: RwLock::new(HashMap::new()), + storage: RwLock::new(AccountStorage(HashMap::new())), next_id: AtomicUsize::new(0), write_version: AtomicUsize::new(0), paths, @@ -244,7 +300,7 @@ impl AccountsDB { } pub fn has_accounts(&self, fork: Fork) -> bool { - for x in self.storage.read().unwrap().values() { + for x in self.storage.read().unwrap().0.values() { if x.fork_id == fork && x.count() > 0 { return true; } @@ -256,7 +312,7 @@ impl AccountsDB { // PERF: Sequentially read each storage entry in parallel pub fn scan_account_storage(&self, fork_id: Fork, scan_func: F) -> Vec where - F: Fn(&StoredAccount, &mut B) -> (), + F: Fn(&StoredAccount, AppendVecId, &mut B) -> (), F: Send + Sync, B: Send + Default, { @@ -264,6 +320,7 @@ impl AccountsDB { .storage .read() .unwrap() + .0 .values() .filter(|store| store.fork_id == fork_id) .cloned() @@ -274,9 +331,9 @@ impl AccountsDB { .map(|storage| { let accounts = storage.accounts.accounts(0); let mut retval = B::default(); - accounts - .iter() - .for_each(|stored_account| scan_func(stored_account, &mut retval)); + accounts.iter().for_each(|stored_account| { + scan_func(stored_account, storage.id, &mut retval) + }); retval }) .collect() @@ -292,6 +349,7 @@ impl AccountsDB { let (info, fork) = accounts_index.get(pubkey, ancestors)?; //TODO: thread this as a ref storage + .0 .get(&info.id) .and_then(|store| Some(store.accounts.get_account(info.offset)?.0.clone_account())) .map(|account| (account, fork)) @@ -311,6 +369,7 @@ impl AccountsDB { let mut candidates: Vec> = { let stores = self.storage.read().unwrap(); stores + .0 .values() .filter_map(|x| { if x.status() == AccountStorageStatus::StorageAvailable && x.fork_id == fork_id @@ -326,7 +385,7 @@ impl AccountsDB { let mut stores = self.storage.write().unwrap(); let path_index = thread_rng().gen_range(0, self.paths.len()); let storage = Arc::new(self.new_storage_entry(fork_id, &self.paths[path_index])); - stores.insert(storage.id, storage.clone()); + stores.0.insert(storage.id, storage.clone()); candidates.push(storage); } let rv = thread_rng().gen_range(0, candidates.len()); @@ -336,9 +395,8 @@ impl AccountsDB { pub fn purge_fork(&self, fork: Fork) { //add_root should be called first let is_root = self.accounts_index.read().unwrap().is_root(fork); - trace!("PURGING {} {}", fork, is_root); if !is_root { - self.storage.write().unwrap().retain(|_, v| { + self.storage.write().unwrap().0.retain(|_, v| { trace!("PURGING {} {}", v.fork_id, fork); v.fork_id != fork }); @@ -400,7 +458,7 @@ impl AccountsDB { fn remove_dead_accounts(&self, reclaims: Vec<(Fork, AccountInfo)>) -> HashSet { let storage = self.storage.read().unwrap(); for (fork_id, account_info) in reclaims { - if let Some(store) = storage.get(&account_info.id) { + if let Some(store) = storage.0.get(&account_info.id) { assert_eq!( fork_id, store.fork_id, "AccountDB::accounts_index corrupted. Storage should only point to one fork" @@ -410,6 +468,7 @@ impl AccountsDB { } //TODO: performance here could be improved if AccountsDB::storage was organized by fork let dead_forks: HashSet = storage + .0 .values() .filter_map(|x| { if x.count() == 0 { @@ -420,6 +479,7 @@ impl AccountsDB { }) .collect(); let live_forks: HashSet = storage + .0 .values() .filter_map(|x| if x.count() > 0 { Some(x.fork_id) } else { None }) .collect(); @@ -451,12 +511,147 @@ impl AccountsDB { pub fn add_root(&self, fork: Fork) { self.accounts_index.write().unwrap().add_root(fork) } + + fn merge( + dest: &mut HashMap, + source: &HashMap, + ) { + for (key, (source_version, source_info)) in source.iter() { + if let Some((dest_version, _)) = dest.get(key) { + if dest_version > source_version { + continue; + } + } + dest.insert(*key, (*source_version, source_info.clone())); + } + } + + fn generate_index(&mut self) { + let mut forks: Vec = self + .storage + .read() + .unwrap() + .0 + .values() + .map(|x| x.fork_id) + .collect(); + + forks.sort(); + for fork_id in forks.iter() { + let mut accumulator: Vec> = self + .scan_account_storage( + *fork_id, + |stored_account: &StoredAccount, + id: AppendVecId, + accum: &mut HashMap| { + let account_info = AccountInfo { + id, + offset: stored_account.offset, + lamports: stored_account.balance.lamports, + }; + accum.insert( + stored_account.meta.pubkey, + (stored_account.meta.write_version, account_info), + ); + }, + ); + + let mut account_maps = accumulator.pop().unwrap(); + while let Some(maps) = accumulator.pop() { + AccountsDB::merge(&mut account_maps, &maps); + } + let mut accounts_index = self.accounts_index.write().unwrap(); + for (pubkey, (_, account_info)) in account_maps.iter() { + accounts_index.add_index(*fork_id, pubkey, account_info.clone()); + } + } + } +} + +impl Serialize for AccountsDB { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::ser::Serializer, + { + use serde::ser::Error; + let len = serialized_size(&self.accounts_index).unwrap() + + serialized_size(&self.paths).unwrap() + + serialized_size(&self.storage).unwrap() + + std::mem::size_of::() as u64 + + serialized_size(&self.file_size).unwrap(); + let mut buf = vec![0u8; len as usize]; + let mut wr = Cursor::new(&mut buf[..]); + serialize_into(&mut wr, &self.accounts_index).map_err(Error::custom)?; + serialize_into(&mut wr, &self.paths).map_err(Error::custom)?; + serialize_into(&mut wr, &self.storage).map_err(Error::custom)?; + serialize_into( + &mut wr, + &(self.write_version.load(Ordering::Relaxed) as u64), + ) + .map_err(Error::custom)?; + serialize_into(&mut wr, &self.file_size).map_err(Error::custom)?; + let len = wr.position() as usize; + serializer.serialize_bytes(&wr.into_inner()[..len]) + } +} + +struct AccountsDBVisitor; + +impl<'a> serde::de::Visitor<'a> for AccountsDBVisitor { + type Value = AccountsDB; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("Expecting AccountsDB") + } + + fn visit_bytes(self, data: &[u8]) -> std::result::Result + where + E: serde::de::Error, + { + use serde::de::Error; + let mut rd = Cursor::new(&data[..]); + let accounts_index: RwLock> = + deserialize_from(&mut rd).map_err(Error::custom)?; + let paths: Vec = deserialize_from(&mut rd).map_err(Error::custom)?; + let storage: RwLock = deserialize_from(&mut rd).map_err(Error::custom)?; + let write_version: u64 = deserialize_from(&mut rd).map_err(Error::custom)?; + let file_size: u64 = deserialize_from(&mut rd).map_err(Error::custom)?; + + let mut ids: Vec = storage.read().unwrap().0.keys().cloned().collect(); + ids.sort(); + + let mut accounts_db = AccountsDB { + accounts_index, + storage, + next_id: AtomicUsize::new(ids[ids.len() - 1] + 1), + write_version: AtomicUsize::new(write_version as usize), + paths, + file_size, + thread_pool: rayon::ThreadPoolBuilder::new() + .num_threads(2) + .build() + .unwrap(), + }; + accounts_db.generate_index(); + + Ok(accounts_db) + } +} + +impl<'de> Deserialize<'de> for AccountsDB { + fn deserialize(deserializer: D) -> std::result::Result + where + D: ::serde::Deserializer<'de>, + { + deserializer.deserialize_bytes(AccountsDBVisitor) + } } #[cfg(test)] mod tests { // TODO: all the bank tests are bank specific, issue: 2194 use super::*; + use bincode::{deserialize_from, serialize_into, serialized_size}; use rand::{thread_rng, Rng}; use solana_sdk::account::Account; @@ -654,16 +849,16 @@ mod tests { db.store(1, &[(&pubkeys[0], &account)]); { let stores = db.storage.read().unwrap(); - assert_eq!(stores.len(), 2); - assert_eq!(stores[&0].count(), 2); - assert_eq!(stores[&1].count(), 2); + assert_eq!(stores.0.len(), 2); + assert_eq!(stores.0[&0].count(), 2); + assert_eq!(stores.0[&1].count(), 2); } db.add_root(1); { let stores = db.storage.read().unwrap(); - assert_eq!(stores.len(), 2); - assert_eq!(stores[&0].count(), 2); - assert_eq!(stores[&1].count(), 2); + assert_eq!(stores.0.len(), 2); + assert_eq!(stores.0[&0].count(), 2); + assert_eq!(stores.0[&1].count(), 2); } } @@ -736,19 +931,40 @@ mod tests { fn check_storage(accounts: &AccountsDB, count: usize) -> bool { let stores = accounts.storage.read().unwrap(); - assert_eq!(stores.len(), 1); - assert_eq!(stores[&0].status(), AccountStorageStatus::StorageAvailable); - stores[&0].count() == count + assert_eq!(stores.0.len(), 1); + assert_eq!( + stores.0[&0].status(), + AccountStorageStatus::StorageAvailable + ); + stores.0[&0].count() == count } - fn check_accounts(accounts: &AccountsDB, pubkeys: &Vec, fork: Fork) { - for _ in 1..100 { - let idx = thread_rng().gen_range(0, 99); + fn check_accounts( + accounts: &AccountsDB, + pubkeys: &Vec, + fork: Fork, + num: usize, + count: usize, + ) { + for _ in 1..num { + let idx = thread_rng().gen_range(0, num - 1); let ancestors = vec![(fork, 0)].into_iter().collect(); let account = accounts.load_slow(&ancestors, &pubkeys[idx]).unwrap(); - let mut default_account = Account::default(); - default_account.lamports = (idx + 1) as u64; - assert_eq!((default_account, 0), account); + let account1 = Account::new((idx + count) as u64, 0, &Account::default().owner); + assert_eq!(account, (account1, fork)); + } + } + + fn modify_accounts( + accounts: &AccountsDB, + pubkeys: &Vec, + fork: Fork, + num: usize, + count: usize, + ) { + for idx in 0..num { + let account = Account::new((idx + count) as u64, 0, &Account::default().owner); + accounts.store(fork, &[(&pubkeys[idx], &account)]); } } @@ -771,7 +987,7 @@ mod tests { let accounts = AccountsDB::new(&paths.paths); let mut pubkeys: Vec = vec![]; create_account(&accounts, &mut pubkeys, 0, 100, 0, 0); - check_accounts(&accounts, &pubkeys, 0); + check_accounts(&accounts, &pubkeys, 0, 100, 1); } #[test] @@ -805,7 +1021,7 @@ mod tests { } let mut append_vec_histogram = HashMap::new(); - for storage in accounts.storage.read().unwrap().values() { + for storage in accounts.storage.read().unwrap().0.values() { *append_vec_histogram.entry(storage.fork_id).or_insert(0) += 1; } for count in append_vec_histogram.values() { @@ -827,9 +1043,12 @@ mod tests { accounts.store(0, &[(&pubkey1, &account1)]); { let stores = accounts.storage.read().unwrap(); - assert_eq!(stores.len(), 1); - assert_eq!(stores[&0].count(), 1); - assert_eq!(stores[&0].status(), AccountStorageStatus::StorageAvailable); + assert_eq!(stores.0.len(), 1); + assert_eq!(stores.0[&0].count(), 1); + assert_eq!( + stores.0[&0].status(), + AccountStorageStatus::StorageAvailable + ); } let pubkey2 = Pubkey::new_rand(); @@ -837,11 +1056,14 @@ mod tests { accounts.store(0, &[(&pubkey2, &account2)]); { let stores = accounts.storage.read().unwrap(); - assert_eq!(stores.len(), 2); - assert_eq!(stores[&0].count(), 1); - assert_eq!(stores[&0].status(), AccountStorageStatus::StorageFull); - assert_eq!(stores[&1].count(), 1); - assert_eq!(stores[&1].status(), AccountStorageStatus::StorageAvailable); + assert_eq!(stores.0.len(), 2); + assert_eq!(stores.0[&0].count(), 1); + assert_eq!(stores.0[&0].status(), AccountStorageStatus::StorageFull); + assert_eq!(stores.0[&1].count(), 1); + assert_eq!( + stores.0[&1].status(), + AccountStorageStatus::StorageAvailable + ); } let ancestors = vec![(0, 0)].into_iter().collect(); assert_eq!( @@ -859,13 +1081,13 @@ mod tests { accounts.store(0, &[(&pubkey1, &account1)]); { let stores = accounts.storage.read().unwrap(); - assert_eq!(stores.len(), 3); - assert_eq!(stores[&0].count(), count[index]); - assert_eq!(stores[&0].status(), status[0]); - assert_eq!(stores[&1].count(), 1); - assert_eq!(stores[&1].status(), status[1]); - assert_eq!(stores[&2].count(), count[index ^ 1]); - assert_eq!(stores[&2].status(), status[0]); + assert_eq!(stores.0.len(), 3); + assert_eq!(stores.0[&0].count(), count[index]); + assert_eq!(stores.0[&0].status(), status[0]); + assert_eq!(stores.0[&1].count(), 1); + assert_eq!(stores.0[&1].status(), status[1]); + assert_eq!(stores.0[&2].count(), count[index ^ 1]); + assert_eq!(stores.0[&2].status(), status[0]); } let ancestors = vec![(0, 0)].into_iter().collect(); assert_eq!( @@ -928,17 +1150,42 @@ mod tests { assert!(accounts.accounts_index.read().unwrap().is_purged(0)); //fork is still there, since gc is lazy - assert!(accounts.storage.read().unwrap().get(&info.id).is_some()); + assert!(accounts.storage.read().unwrap().0.get(&info.id).is_some()); //store causes cleanup accounts.store(1, &[(&pubkey, &account)]); //fork is gone - assert!(accounts.storage.read().unwrap().get(&info.id).is_none()); + assert!(accounts.storage.read().unwrap().0.get(&info.id).is_none()); //new value is there let ancestors = vec![(1, 1)].into_iter().collect(); assert_eq!(accounts.load_slow(&ancestors, &pubkey), Some((account, 1))); } + #[test] + fn test_accounts_db_serialize() { + solana_logger::setup(); + let paths = get_tmp_accounts_path!(); + let accounts = AccountsDB::new(&paths.paths); + let mut pubkeys: Vec = vec![]; + create_account(&accounts, &mut pubkeys, 0, 100, 0, 0); + assert_eq!(check_storage(&accounts, 100), true); + check_accounts(&accounts, &pubkeys, 0, 100, 1); + modify_accounts(&accounts, &pubkeys, 0, 100, 2); + check_accounts(&accounts, &pubkeys, 0, 100, 2); + accounts.add_root(0); + + let mut pubkeys1: Vec = vec![]; + create_account(&accounts, &mut pubkeys1, 1, 10, 0, 0); + + let mut buf = vec![0u8; serialized_size(&accounts).unwrap() as usize]; + let mut writer = Cursor::new(&mut buf[..]); + serialize_into(&mut writer, &accounts).unwrap(); + + let mut reader = Cursor::new(&mut buf[..]); + let daccounts: AccountsDB = deserialize_from(&mut reader).unwrap(); + check_accounts(&daccounts, &pubkeys, 0, 100, 2); + check_accounts(&daccounts, &pubkeys1, 1, 10, 1); + } } diff --git a/runtime/src/accounts_index.rs b/runtime/src/accounts_index.rs index 46a2582315..d91f144e38 100644 --- a/runtime/src/accounts_index.rs +++ b/runtime/src/accounts_index.rs @@ -1,13 +1,17 @@ -use hashbrown::{HashMap, HashSet}; use log::*; +use serde::{Deserialize, Serialize}; use solana_sdk::pubkey::Pubkey; +use std::collections::{HashMap, HashSet}; pub type Fork = u64; -#[derive(Default)] +#[derive(Debug, Default, Deserialize, Serialize)] pub struct AccountsIndex { - account_maps: HashMap>, - roots: HashSet, + #[serde(skip)] + pub account_maps: HashMap>, + + pub roots: HashSet, + //This value that needs to be stored to recover the index from AppendVec pub last_root: Fork, } @@ -35,7 +39,7 @@ impl AccountsIndex { let mut rv = vec![]; let mut fork_vec: Vec<(Fork, T)> = vec![]; { - let entry = self.account_maps.entry(*pubkey).or_insert(vec![]); + let entry = self.account_maps.entry(*pubkey).or_insert_with(|| vec![]); std::mem::swap(entry, &mut fork_vec); }; @@ -54,11 +58,17 @@ impl AccountsIndex { ); fork_vec.retain(|(fork, _)| !self.is_purged(*fork)); { - let entry = self.account_maps.entry(*pubkey).or_insert(vec![]); + let entry = self.account_maps.entry(*pubkey).or_insert_with(|| vec![]); std::mem::swap(entry, &mut fork_vec); }; rv } + + pub fn add_index(&mut self, fork: Fork, pubkey: &Pubkey, account_info: T) { + let entry = self.account_maps.entry(*pubkey).or_insert_with(|| vec![]); + entry.push((fork, account_info)); + } + pub fn is_purged(&self, fork: Fork) -> bool { fork < self.last_root } diff --git a/runtime/src/append_vec.rs b/runtime/src/append_vec.rs index fc090c3720..063b347424 100644 --- a/runtime/src/append_vec.rs +++ b/runtime/src/append_vec.rs @@ -1,10 +1,13 @@ +use bincode::{deserialize_from, serialize_into, serialized_size}; use memmap::MmapMut; +use serde::{Deserialize, Serialize}; use solana_sdk::account::Account; use solana_sdk::pubkey::Pubkey; +use std::fmt; use std::fs::OpenOptions; -use std::io::{Seek, SeekFrom, Write}; +use std::io::{Cursor, Seek, SeekFrom, Write}; use std::mem; -use std::path::Path; +use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Mutex; @@ -26,7 +29,7 @@ pub struct StorageMeta { pub data_len: u64, } -#[derive(Serialize, Deserialize, Clone, Default, Eq, PartialEq)] +#[derive(Serialize, Deserialize, Clone, Debug, Default, Eq, PartialEq)] pub struct AccountBalance { /// lamports in the account pub lamports: u64, @@ -38,11 +41,13 @@ pub struct AccountBalance { /// References to Memory Mapped memory /// The Account is stored separately from its data, so getting the actual account requires a clone +#[derive(PartialEq, Debug)] pub struct StoredAccount<'a> { pub meta: &'a StorageMeta, /// account data pub balance: &'a AccountBalance, pub data: &'a [u8], + pub offset: usize, } impl<'a> StoredAccount<'a> { @@ -56,7 +61,10 @@ impl<'a> StoredAccount<'a> { } } +#[derive(Debug)] +#[allow(clippy::mutex_atomic)] pub struct AppendVec { + path: PathBuf, map: MmapMut, // This mutex forces append to be single threaded, but concurrent with reads append_offset: Mutex, @@ -64,6 +72,12 @@ pub struct AppendVec { file_size: u64, } +impl Drop for AppendVec { + fn drop(&mut self) { + let _ignored = std::fs::remove_dir_all(&self.path.parent().unwrap()); + } +} + impl AppendVec { #[allow(clippy::mutex_atomic)] pub fn new(file: &Path, create: bool, size: usize) -> Self { @@ -82,6 +96,7 @@ impl AppendVec { let map = unsafe { MmapMut::map_mut(&data).expect("failed to map the data file") }; AppendVec { + path: file.to_path_buf(), map, // This mutex forces append to be single threaded, but concurrent with reads // See UNSAFE usage in `append_ptr` @@ -184,6 +199,7 @@ impl AppendVec { meta, balance, data, + offset, }, next, )) @@ -259,13 +275,13 @@ pub mod test_utils { fn drop(&mut self) { let mut path = PathBuf::new(); std::mem::swap(&mut path, &mut self.path); - let _ = std::fs::remove_file(path); + let _ignored = std::fs::remove_file(path); } } pub fn get_append_vec_path(path: &str) -> TempFile { let out_dir = - std::env::var("OUT_DIR").unwrap_or_else(|_| "/tmp/append_vec_tests".to_string()); + std::env::var("OUT_DIR").unwrap_or_else(|_| "target/append_vec_tests".to_string()); let mut buf = PathBuf::new(); let rand_string: String = thread_rng().sample_iter(&Alphanumeric).take(30).collect(); buf.push(&format!("{}/{}{}", out_dir, path, rand_string)); @@ -286,6 +302,82 @@ pub mod test_utils { } } +#[allow(clippy::mutex_atomic)] +impl Serialize for AppendVec { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::ser::Serializer, + { + use serde::ser::Error; + let len = serialized_size(&self.path).unwrap() + + std::mem::size_of::() as u64 + + std::mem::size_of::() as u64 + + std::mem::size_of::() as u64; + let mut buf = vec![0u8; len as usize]; + let mut wr = Cursor::new(&mut buf[..]); + serialize_into(&mut wr, &self.path).map_err(Error::custom)?; + serialize_into(&mut wr, &(self.current_len.load(Ordering::Relaxed) as u64)) + .map_err(Error::custom)?; + serialize_into(&mut wr, &self.file_size).map_err(Error::custom)?; + let offset = *self.append_offset.lock().unwrap(); + serialize_into(&mut wr, &offset).map_err(Error::custom)?; + let len = wr.position() as usize; + serializer.serialize_bytes(&wr.into_inner()[..len]) + } +} + +struct AppendVecVisitor; + +impl<'a> serde::de::Visitor<'a> for AppendVecVisitor { + type Value = AppendVec; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("Expecting AppendVec") + } + + #[allow(clippy::mutex_atomic)] + fn visit_bytes(self, data: &[u8]) -> std::result::Result + where + E: serde::de::Error, + { + use serde::de::Error; + let mut rd = Cursor::new(&data[..]); + let path: PathBuf = deserialize_from(&mut rd).map_err(Error::custom)?; + let current_len: u64 = deserialize_from(&mut rd).map_err(Error::custom)?; + let file_size: u64 = deserialize_from(&mut rd).map_err(Error::custom)?; + let offset: usize = deserialize_from(&mut rd).map_err(Error::custom)?; + + let data = OpenOptions::new() + .read(true) + .write(true) + .create(false) + .open(path.as_path()); + + if data.is_err() { + std::fs::create_dir_all(&path.parent().unwrap()).expect("Create directory failed"); + return Ok(AppendVec::new(&path, true, file_size as usize)); + } + + let map = unsafe { MmapMut::map_mut(&data.unwrap()).expect("failed to map the data file") }; + Ok(AppendVec { + path, + map, + append_offset: Mutex::new(offset), + current_len: AtomicUsize::new(current_len as usize), + file_size, + }) + } +} + +impl<'de> Deserialize<'de> for AppendVec { + fn deserialize(deserializer: D) -> std::result::Result + where + D: ::serde::Deserializer<'de>, + { + deserializer.deserialize_bytes(AppendVecVisitor) + } +} + #[cfg(test)] pub mod tests { use super::test_utils::*; @@ -355,4 +447,30 @@ pub mod tests { duration_as_ms(&now.elapsed()), ); } + + #[test] + fn test_append_vec_serialize() { + let path = Path::new("append_vec_serialize"); + let av: AppendVec = AppendVec::new(path, true, 1024 * 1024); + let account1 = create_test_account(1); + let index1 = av.append_account_test(&account1).unwrap(); + assert_eq!(index1, 0); + assert_eq!(av.get_account_test(index1).unwrap(), account1); + + let account2 = create_test_account(2); + let index2 = av.append_account_test(&account2).unwrap(); + assert_eq!(av.get_account_test(index2).unwrap(), account2); + assert_eq!(av.get_account_test(index1).unwrap(), account1); + + let mut buf = vec![0u8; serialized_size(&av).unwrap() as usize]; + let mut writer = Cursor::new(&mut buf[..]); + serialize_into(&mut writer, &av).unwrap(); + + let mut reader = Cursor::new(&mut buf[..]); + let dav: AppendVec = deserialize_from(&mut reader).unwrap(); + + assert_eq!(dav.get_account_test(index2).unwrap(), account2); + assert_eq!(dav.get_account_test(index1).unwrap(), account1); + std::fs::remove_file(path).unwrap(); + } } diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 5f4a54e7c6..6cfa40f135 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -3,17 +3,20 @@ //! on behalf of the caller, and a low-level API for when they have //! already been signed and verified. use crate::accounts::{AccountLockType, Accounts}; -use crate::accounts_db::{ErrorCounters, InstructionAccounts, InstructionLoaders}; +use crate::accounts_db::{AccountsDB, ErrorCounters, InstructionAccounts, InstructionLoaders}; use crate::accounts_index::Fork; use crate::blockhash_queue::BlockhashQueue; use crate::epoch_schedule::EpochSchedule; use crate::locked_accounts_results::LockedAccountsResults; use crate::message_processor::{MessageProcessor, ProcessInstruction}; +use crate::serde_utils::{ + deserialize_atomicbool, deserialize_atomicusize, serialize_atomicbool, serialize_atomicusize, +}; use crate::stakes::Stakes; use crate::status_cache::StatusCache; -use bincode::serialize; -use hashbrown::HashMap; +use bincode::{deserialize_from, serialize, serialize_into, serialized_size}; use log::*; +use serde::{Deserialize, Serialize}; use solana_metrics::{ datapoint_info, inc_new_counter_debug, inc_new_counter_error, inc_new_counter_info, }; @@ -31,27 +34,155 @@ use solana_sdk::timing::{duration_as_ms, duration_as_us, MAX_RECENT_BLOCKHASHES} use solana_sdk::transaction::{Result, Transaction, TransactionError}; use std::borrow::Borrow; use std::cmp; +use std::collections::HashMap; +use std::fmt; +use std::io::Cursor; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, RwLock, RwLockReadGuard}; use std::time::Instant; type BankStatusCache = StatusCache>; -/// Manager for the state of all accounts and programs after processing its entries. #[derive(Default)] -pub struct Bank { +pub struct BankRc { /// where all the Accounts are stored accounts: Arc, + /// Previous checkpoint of this bank + parent: RwLock>>, +} + +impl Serialize for BankRc { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::ser::Serializer, + { + use serde::ser::Error; + let len = serialized_size(&*self.accounts.accounts_db).unwrap() + + serialized_size(&*self.accounts).unwrap(); + let mut buf = vec![0u8; len as usize]; + let mut wr = Cursor::new(&mut buf[..]); + serialize_into(&mut wr, &*self.accounts).map_err(Error::custom)?; + serialize_into(&mut wr, &*self.accounts.accounts_db).map_err(Error::custom)?; + let len = wr.position() as usize; + serializer.serialize_bytes(&wr.into_inner()[..len]) + } +} + +struct BankRcVisitor; + +impl<'a> serde::de::Visitor<'a> for BankRcVisitor { + type Value = BankRc; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("Expecting BankRc") + } + + #[allow(clippy::mutex_atomic)] + fn visit_bytes(self, data: &[u8]) -> std::result::Result + where + E: serde::de::Error, + { + use serde::de::Error; + let mut rd = Cursor::new(&data[..]); + let mut accounts: Accounts = deserialize_from(&mut rd).map_err(Error::custom)?; + let accounts_db: AccountsDB = deserialize_from(&mut rd).map_err(Error::custom)?; + + accounts.accounts_db = Arc::new(accounts_db); + Ok(BankRc { + accounts: Arc::new(accounts), + parent: RwLock::new(None), + }) + } +} + +impl<'de> Deserialize<'de> for BankRc { + fn deserialize(deserializer: D) -> std::result::Result + where + D: ::serde::Deserializer<'de>, + { + deserializer.deserialize_bytes(BankRcVisitor) + } +} + +#[derive(Default)] +pub struct StatusCacheRc { + /// where all the Accounts are stored /// A cache of signature statuses status_cache: Arc>, +} + +impl Serialize for StatusCacheRc { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::ser::Serializer, + { + use serde::ser::Error; + let len = serialized_size(&*self.status_cache).unwrap(); + let mut buf = vec![0u8; len as usize]; + let mut wr = Cursor::new(&mut buf[..]); + { + let mut status_cache = self.status_cache.write().unwrap(); + serialize_into(&mut wr, &*status_cache).map_err(Error::custom)?; + status_cache.merge_caches(); + } + let len = wr.position() as usize; + serializer.serialize_bytes(&wr.into_inner()[..len]) + } +} + +struct StatusCacheRcVisitor; + +impl<'a> serde::de::Visitor<'a> for StatusCacheRcVisitor { + type Value = StatusCacheRc; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("Expecting StatusCacheRc") + } + + #[allow(clippy::mutex_atomic)] + fn visit_bytes(self, data: &[u8]) -> std::result::Result + where + E: serde::de::Error, + { + use serde::de::Error; + let mut rd = Cursor::new(&data[..]); + let status_cache: BankStatusCache = deserialize_from(&mut rd).map_err(Error::custom)?; + Ok(StatusCacheRc { + status_cache: Arc::new(RwLock::new(status_cache)), + }) + } +} + +impl<'de> Deserialize<'de> for StatusCacheRc { + fn deserialize(deserializer: D) -> std::result::Result + where + D: ::serde::Deserializer<'de>, + { + deserializer.deserialize_bytes(StatusCacheRcVisitor) + } +} + +impl StatusCacheRc { + pub fn append(&self, status_cache_rc: &StatusCacheRc) { + let sc = status_cache_rc.status_cache.write().unwrap(); + self.status_cache.write().unwrap().append(&sc); + } +} + +/// Manager for the state of all accounts and programs after processing its entries. +#[derive(Default, Deserialize, Serialize)] +pub struct Bank { + /// References to accounts, parent and signature status + #[serde(skip)] + pub rc: BankRc, + + #[serde(skip)] + pub src: StatusCacheRc, /// FIFO queue of `recent_blockhash` items blockhash_queue: RwLock, - /// Previous checkpoint of this bank - parent: RwLock>>, - /// The set of parents including this bank pub ancestors: HashMap, @@ -62,9 +193,13 @@ pub struct Bank { parent_hash: Hash, /// The number of transactions processed without error + #[serde(serialize_with = "serialize_atomicusize")] + #[serde(deserialize_with = "deserialize_atomicusize")] transaction_count: AtomicUsize, // TODO: Use AtomicU64 if/when available /// Bank tick height + #[serde(serialize_with = "serialize_atomicusize")] + #[serde(deserialize_with = "deserialize_atomicusize")] tick_height: AtomicUsize, // TODO: Use AtomicU64 if/when available // Bank max_tick_height @@ -97,6 +232,8 @@ pub struct Bank { /// A boolean reflecting whether any entries were recorded into the PoH /// stream for the slot == self.slot + #[serde(serialize_with = "serialize_atomicbool")] + #[serde(deserialize_with = "deserialize_atomicbool")] is_delta: AtomicBool, /// The Message processor @@ -117,7 +254,7 @@ impl Bank { pub fn new_with_paths(genesis_block: &GenesisBlock, paths: Option) -> Self { let mut bank = Self::default(); bank.ancestors.insert(bank.slot(), 0); - bank.accounts = Arc::new(Accounts::new(paths)); + bank.rc.accounts = Arc::new(Accounts::new(paths)); bank.process_genesis_block(genesis_block); // genesis needs stakes for all epochs up to the epoch implied by // slot = 0 and genesis configuration @@ -137,7 +274,7 @@ impl Bank { let mut bank = Self::default(); bank.blockhash_queue = RwLock::new(parent.blockhash_queue.read().unwrap().clone()); - bank.status_cache = parent.status_cache.clone(); + bank.src.status_cache = parent.src.status_cache.clone(); bank.bank_height = parent.bank_height + 1; bank.fee_calculator = parent.fee_calculator.clone(); @@ -159,11 +296,11 @@ impl Bank { ("bank_height", bank.bank_height, i64) ); - bank.parent = RwLock::new(Some(parent.clone())); + bank.rc.parent = RwLock::new(Some(parent.clone())); bank.parent_hash = parent.hash(); bank.collector_id = *collector_id; - bank.accounts = Arc::new(Accounts::new_from_parent(&parent.accounts)); + bank.rc.accounts = Arc::new(Accounts::new_from_parent(&parent.rc.accounts)); bank.epoch_stakes = { let mut epoch_stakes = parent.epoch_stakes.clone(); @@ -256,19 +393,19 @@ impl Bank { self.freeze(); let parents = self.parents(); - *self.parent.write().unwrap() = None; + *self.rc.parent.write().unwrap() = None; let squash_accounts_start = Instant::now(); for p in parents.iter().rev() { // root forks cannot be purged - self.accounts.add_root(p.slot()); + self.rc.accounts.add_root(p.slot()); } let squash_accounts_ms = duration_as_ms(&squash_accounts_start.elapsed()); let squash_cache_start = Instant::now(); parents .iter() - .for_each(|p| self.status_cache.write().unwrap().add_root(p.slot())); + .for_each(|p| self.src.status_cache.write().unwrap().add_root(p.slot())); let squash_cache_ms = duration_as_ms(&squash_cache_start.elapsed()); datapoint_info!( @@ -280,7 +417,7 @@ impl Bank { /// Return the more recent checkpoint of this bank instance. pub fn parent(&self) -> Option> { - self.parent.read().unwrap().clone() + self.rc.parent.read().unwrap().clone() } fn process_genesis_block(&mut self, genesis_block: &GenesisBlock) { @@ -356,7 +493,7 @@ impl Bank { /// Forget all signatures. Useful for benchmarking. pub fn clear_signatures(&self) { - self.status_cache.write().unwrap().clear_signatures(); + self.src.status_cache.write().unwrap().clear_signatures(); } pub fn can_commit(result: &Result<()>) -> bool { @@ -368,7 +505,7 @@ impl Bank { } fn update_transaction_statuses(&self, txs: &[Transaction], res: &[Result<()>]) { - let mut status_cache = self.status_cache.write().unwrap(); + let mut status_cache = self.src.status_cache.write().unwrap(); for (i, tx) in txs.iter().enumerate() { if Self::can_commit(&res[i]) && !tx.signatures.is_empty() { status_cache.insert( @@ -453,7 +590,7 @@ impl Bank { } // TODO: put this assert back in // assert!(!self.is_frozen()); - let results = self.accounts.lock_accounts(txs); + let results = self.rc.accounts.lock_accounts(txs); LockedAccountsResults::new(results, &self, txs, AccountLockType::AccountLock) } @@ -464,11 +601,12 @@ impl Bank { if locked_accounts_results.needs_unlock { locked_accounts_results.needs_unlock = false; match locked_accounts_results.lock_type() { - AccountLockType::AccountLock => self.accounts.unlock_accounts( + AccountLockType::AccountLock => self.rc.accounts.unlock_accounts( locked_accounts_results.transactions(), locked_accounts_results.locked_accounts_results(), ), AccountLockType::RecordLock => self + .rc .accounts .unlock_record_accounts(locked_accounts_results.transactions()), } @@ -482,12 +620,12 @@ impl Bank { where I: std::borrow::Borrow, { - self.accounts.lock_record_accounts(txs); + self.rc.accounts.lock_record_accounts(txs); LockedAccountsResults::new(vec![], &self, txs, AccountLockType::RecordLock) } pub fn unlock_record_accounts(&self, txs: &[Transaction]) { - self.accounts.unlock_record_accounts(txs) + self.rc.accounts.unlock_record_accounts(txs) } fn load_accounts( @@ -496,7 +634,7 @@ impl Bank { results: Vec>, error_counters: &mut ErrorCounters, ) -> Vec> { - self.accounts.load_accounts( + self.rc.accounts.load_accounts( &self.ancestors, txs, results, @@ -550,7 +688,7 @@ impl Bank { lock_results: Vec>, error_counters: &mut ErrorCounters, ) -> Vec> { - let rcache = self.status_cache.read().unwrap(); + let rcache = self.src.status_cache.read().unwrap(); txs.iter() .zip(lock_results.into_iter()) .map(|(tx, lock_res)| { @@ -770,7 +908,8 @@ impl Bank { // TODO: put this assert back in // assert!(!self.is_frozen()); let now = Instant::now(); - self.accounts + self.rc + .accounts .store_accounts(self.slot(), txs, executed, loaded_accounts); self.store_stakes(txs, executed, loaded_accounts); @@ -838,7 +977,7 @@ impl Bank { } fn store(&self, pubkey: &Pubkey, account: &Account) { - self.accounts.store_slow(self.slot(), pubkey, account); + self.rc.accounts.store_slow(self.slot(), pubkey, account); if Stakes::is_stake(account) { self.stakes.write().unwrap().store(pubkey, account); @@ -867,8 +1006,22 @@ impl Bank { self.store(pubkey, &account); } + pub fn accounts(&self) -> Arc { + self.rc.accounts.clone() + } + + pub fn set_bank_rc(&mut self, bank_rc: &BankRc, status_cache_rc: &StatusCacheRc) { + self.rc.accounts = bank_rc.accounts.clone(); + self.src.status_cache = status_cache_rc.status_cache.clone() + } + + pub fn set_parent(&mut self, parent: &Arc) { + self.rc.parent = RwLock::new(Some(parent.clone())); + } + pub fn get_account(&self, pubkey: &Pubkey) -> Option { - self.accounts + self.rc + .accounts .load_slow(&self.ancestors, pubkey) .map(|(account, _)| account) } @@ -877,12 +1030,12 @@ impl Bank { &self, program_id: &Pubkey, ) -> Vec<(Pubkey, Account)> { - self.accounts.load_by_program(self.slot(), program_id) + self.rc.accounts.load_by_program(self.slot(), program_id) } pub fn get_account_modified_since_parent(&self, pubkey: &Pubkey) -> Option<(Account, Fork)> { let just_self: HashMap = vec![(self.slot(), 0)].into_iter().collect(); - self.accounts.load_slow(&just_self, pubkey) + self.rc.accounts.load_slow(&just_self, pubkey) } pub fn transaction_count(&self) -> u64 { @@ -897,7 +1050,7 @@ impl Bank { &self, signature: &Signature, ) -> Option<(usize, Result<()>)> { - let rcache = self.status_cache.read().unwrap(); + let rcache = self.src.status_cache.read().unwrap(); rcache.get_signature_status_slow(signature, &self.ancestors) } @@ -915,11 +1068,11 @@ impl Bank { fn hash_internal_state(&self) -> Hash { // If there are no accounts, return the same hash as we did before // checkpointing. - if !self.accounts.has_accounts(self.slot()) { + if !self.rc.accounts.has_accounts(self.slot()) { return self.parent_hash; } - let accounts_delta_hash = self.accounts.hash_internal_state(self.slot()); + let accounts_delta_hash = self.rc.accounts.hash_internal_state(self.slot()); extend_and_hash(&self.parent_hash, &serialize(&accounts_delta_hash).unwrap()) } @@ -1016,12 +1169,48 @@ impl Bank { // Register a bogus executable account, which will be loaded and ignored. self.register_native_instruction_processor("", &program_id); } + + pub fn compare_bank(&self, dbank: &Bank) { + assert_eq!(self.slot, dbank.slot); + assert_eq!(self.collector_id, dbank.collector_id); + assert_eq!(self.epoch_schedule, dbank.epoch_schedule); + assert_eq!(self.ticks_per_slot, dbank.ticks_per_slot); + assert_eq!(self.parent_hash, dbank.parent_hash); + assert_eq!( + self.tick_height.load(Ordering::SeqCst), + dbank.tick_height.load(Ordering::SeqCst) + ); + assert_eq!( + self.is_delta.load(Ordering::SeqCst), + dbank.is_delta.load(Ordering::SeqCst) + ); + + let st = self.stakes.read().unwrap(); + let dst = dbank.stakes.read().unwrap(); + assert_eq!(*st, *dst); + + let bh = self.hash.read().unwrap(); + let dbh = dbank.hash.read().unwrap(); + assert_eq!(*bh, *dbh); + + let bhq = self.blockhash_queue.read().unwrap(); + let dbhq = dbank.blockhash_queue.read().unwrap(); + assert_eq!(*bhq, *dbhq); + + let sc = self.src.status_cache.read().unwrap(); + let dsc = dbank.src.status_cache.read().unwrap(); + assert_eq!(*sc, *dsc); + assert_eq!( + self.rc.accounts.hash_internal_state(self.slot), + dbank.rc.accounts.hash_internal_state(dbank.slot) + ); + } } impl Drop for Bank { fn drop(&mut self) { // For root forks this is a noop - self.accounts.purge_fork(self.slot()); + self.rc.accounts.purge_fork(self.slot()); } } @@ -1032,6 +1221,7 @@ mod tests { use crate::genesis_utils::{ create_genesis_block_with_leader, GenesisBlockInfo, BOOTSTRAP_LEADER_LAMPORTS, }; + use bincode::{deserialize_from, serialize_into, serialized_size}; use solana_sdk::genesis_block::create_genesis_block; use solana_sdk::hash; use solana_sdk::instruction::InstructionError; @@ -1040,6 +1230,7 @@ mod tests { use solana_sdk::system_transaction; use solana_vote_api::vote_instruction; use solana_vote_api::vote_state::VoteState; + use std::io::Cursor; #[test] fn test_bank_new() { @@ -1999,4 +2190,28 @@ mod tests { assert!(bank.is_delta.load(Ordering::Relaxed)); } + #[test] + fn test_bank_serialize() { + let (genesis_block, _) = create_genesis_block(500); + let bank0 = Arc::new(Bank::new(&genesis_block)); + let bank = new_from_parent(&bank0); + + // Test new account + let key = Keypair::new(); + bank.deposit(&key.pubkey(), 10); + assert_eq!(bank.get_balance(&key.pubkey()), 10); + + let len = serialized_size(&bank).unwrap() + serialized_size(&bank.rc).unwrap(); + let mut buf = vec![0u8; len as usize]; + let mut writer = Cursor::new(&mut buf[..]); + serialize_into(&mut writer, &bank).unwrap(); + serialize_into(&mut writer, &bank.rc).unwrap(); + + let mut reader = Cursor::new(&mut buf[..]); + let mut dbank: Bank = deserialize_from(&mut reader).unwrap(); + let dbank_rc: BankRc = deserialize_from(&mut reader).unwrap(); + dbank.rc = dbank_rc; + assert_eq!(dbank.get_balance(&key.pubkey()), 10); + bank.compare_bank(&dbank); + } } diff --git a/runtime/src/blockhash_queue.rs b/runtime/src/blockhash_queue.rs index 2143be4341..8ad557f880 100644 --- a/runtime/src/blockhash_queue.rs +++ b/runtime/src/blockhash_queue.rs @@ -1,15 +1,16 @@ -use hashbrown::HashMap; +use serde::{Deserialize, Serialize}; use solana_sdk::hash::Hash; use solana_sdk::timing::timestamp; +use std::collections::HashMap; -#[derive(Debug, PartialEq, Eq, Clone)] +#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] struct HashAge { timestamp: u64, hash_height: u64, } /// Low memory overhead, so can be cloned for every checkpoint -#[derive(Clone)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct BlockhashQueue { /// updated whenever an hash is registered hash_height: u64, diff --git a/runtime/src/bloom.rs b/runtime/src/bloom.rs index c1674cffd1..ede157eb6e 100644 --- a/runtime/src/bloom.rs +++ b/runtime/src/bloom.rs @@ -16,7 +16,8 @@ pub trait BloomHashIndex { #[derive(Serialize, Deserialize, Default, Clone, Debug, PartialEq)] pub struct Bloom { pub keys: Vec, - pub bits: BitVec, + pub bits: BitVec, + num_bits_set: u64, _phantom: PhantomData, } @@ -26,6 +27,7 @@ impl Bloom { Bloom { keys, bits, + num_bits_set: 0, _phantom: PhantomData::default(), } } @@ -47,11 +49,15 @@ impl Bloom { } pub fn clear(&mut self) { self.bits = BitVec::new_fill(false, self.bits.len()); + self.num_bits_set = 0; } pub fn add(&mut self, key: &T) { for k in &self.keys { let pos = self.pos(key, *k); - self.bits.set(pos, true); + if !self.bits.get(pos) { + self.num_bits_set += 1; + self.bits.set(pos, true); + } } } pub fn contains(&self, key: &T) -> bool { diff --git a/runtime/src/epoch_schedule.rs b/runtime/src/epoch_schedule.rs index acd103ed6a..1c578c0c1b 100644 --- a/runtime/src/epoch_schedule.rs +++ b/runtime/src/epoch_schedule.rs @@ -2,7 +2,7 @@ use solana_vote_api::vote_state::MAX_LOCKOUT_HISTORY; pub const MINIMUM_SLOT_LENGTH: usize = MAX_LOCKOUT_HISTORY + 1; -#[derive(Default, Debug, PartialEq, Eq, Clone, Copy)] +#[derive(Default, Debug, PartialEq, Eq, Clone, Copy, Deserialize, Serialize)] pub struct EpochSchedule { /// The maximum number of slots in each epoch. pub slots_per_epoch: u64, diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index 0e44d53aef..89d4ff0f45 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -12,8 +12,9 @@ pub mod loader_utils; pub mod locked_accounts_results; pub mod message_processor; mod native_loader; +mod serde_utils; pub mod stakes; -mod status_cache; +pub mod status_cache; mod system_instruction_processor; #[macro_use] diff --git a/runtime/src/message_processor.rs b/runtime/src/message_processor.rs index 77d16f6bae..e6c565f65f 100644 --- a/runtime/src/message_processor.rs +++ b/runtime/src/message_processor.rs @@ -1,5 +1,6 @@ use crate::native_loader; use crate::system_instruction_processor; +use serde::{Deserialize, Serialize}; use solana_sdk::account::{create_keyed_accounts, Account, KeyedAccount}; use solana_sdk::instruction::{CompiledInstruction, InstructionError}; use solana_sdk::instruction_processor_utils; @@ -85,8 +86,11 @@ pub type ProcessInstruction = pub type SymbolCache = RwLock, Symbol>>; +#[derive(Serialize, Deserialize)] pub struct MessageProcessor { + #[serde(skip)] instruction_processors: Vec<(Pubkey, ProcessInstruction)>, + #[serde(skip)] symbol_cache: SymbolCache, } diff --git a/runtime/src/serde_utils.rs b/runtime/src/serde_utils.rs new file mode 100644 index 0000000000..f4376d83b1 --- /dev/null +++ b/runtime/src/serde_utils.rs @@ -0,0 +1,62 @@ +use std::fmt; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; + +struct U64Visitor; +impl<'a> serde::de::Visitor<'a> for U64Visitor { + type Value = u64; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("Expecting u64") + } + fn visit_u64(self, data: u64) -> std::result::Result + where + E: serde::de::Error, + { + Ok(data) + } +} + +pub fn deserialize_atomicusize<'de, D>(d: D) -> Result +where + D: serde::de::Deserializer<'de>, +{ + let value = d.deserialize_u64(U64Visitor)?; + Ok(AtomicUsize::new(value as usize)) +} + +pub fn serialize_atomicusize(x: &AtomicUsize, s: S) -> Result +where + S: serde::Serializer, +{ + s.serialize_u64(x.load(Ordering::SeqCst) as u64) +} + +struct BoolVisitor; +impl<'a> serde::de::Visitor<'a> for BoolVisitor { + type Value = bool; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("Expecting bool") + } + fn visit_bool(self, data: bool) -> std::result::Result + where + E: serde::de::Error, + { + Ok(data) + } +} + +pub fn deserialize_atomicbool<'de, D>(d: D) -> Result +where + D: serde::de::Deserializer<'de>, +{ + let value = d.deserialize_bool(BoolVisitor)?; + Ok(AtomicBool::new(value)) +} + +pub fn serialize_atomicbool(x: &AtomicBool, s: S) -> Result +where + S: serde::Serializer, +{ + s.serialize_bool(x.load(Ordering::SeqCst)) +} diff --git a/runtime/src/stakes.rs b/runtime/src/stakes.rs index b2820b335c..541d879218 100644 --- a/runtime/src/stakes.rs +++ b/runtime/src/stakes.rs @@ -1,11 +1,11 @@ //! Stakes serve as a cache of stake and vote accounts to derive //! node stakes -use hashbrown::HashMap; use solana_sdk::account::Account; use solana_sdk::pubkey::Pubkey; use solana_stake_api::stake_state::StakeState; +use std::collections::HashMap; -#[derive(Default, Clone)] +#[derive(Default, Clone, PartialEq, Debug, Deserialize, Serialize)] pub struct Stakes { /// vote accounts vote_accounts: HashMap, diff --git a/runtime/src/status_cache.rs b/runtime/src/status_cache.rs index d37c6ccd2f..71b390cfe8 100644 --- a/runtime/src/status_cache.rs +++ b/runtime/src/status_cache.rs @@ -1,8 +1,10 @@ -use hashbrown::{HashMap, HashSet}; use log::*; use rand::{thread_rng, Rng}; +use serde::ser::SerializeSeq; +use serde::{Deserialize, Serialize}; use solana_sdk::hash::Hash; use solana_sdk::signature::Signature; +use std::collections::{HashMap, HashSet}; const MAX_CACHE_ENTRIES: usize = solana_sdk::timing::MAX_HASH_AGE_IN_SECONDS; const CACHED_SIGNATURE_SIZE: usize = 20; @@ -14,22 +16,36 @@ type SignatureSlice = [u8; CACHED_SIGNATURE_SIZE]; type SignatureMap = HashMap>; type StatusMap = HashMap)>; -pub struct StatusCache { +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub struct StatusCache { /// all signatures seen during a hash period - cache: StatusMap, + #[serde(serialize_with = "serialize_statusmap")] + cache: Vec>, roots: HashSet, } -impl Default for StatusCache { +fn serialize_statusmap(x: &[StatusMap], s: S) -> Result +where + T: serde::Serialize + Clone, + S: serde::Serializer, +{ + let cache0: StatusMap = HashMap::new(); + let mut seq = s.serialize_seq(Some(x.len()))?; + seq.serialize_element(&cache0)?; + seq.serialize_element(&x[1])?; + seq.end() +} + +impl Default for StatusCache { fn default() -> Self { Self { - cache: HashMap::default(), + cache: vec![HashMap::default(); 2], roots: HashSet::default(), } } } -impl StatusCache { +impl StatusCache { /// Check if the signature from a transaction is in any of the forks in the ancestors set. pub fn get_signature_status( &self, @@ -37,15 +53,26 @@ impl StatusCache { transaction_blockhash: &Hash, ancestors: &HashMap, ) -> Option<(ForkId, T)> { - let (_, index, sigmap) = self.cache.get(transaction_blockhash)?; - let mut sig_slice = [0u8; CACHED_SIGNATURE_SIZE]; - sig_slice.clone_from_slice(&sig.as_ref()[*index..*index + CACHED_SIGNATURE_SIZE]); - let stored_forks = sigmap.get(&sig_slice)?; - stored_forks - .iter() - .filter(|(f, _)| ancestors.get(f).is_some() || self.roots.get(f).is_some()) - .nth(0) - .cloned() + for cache in self.cache.iter() { + let map = cache.get(transaction_blockhash); + if map.is_none() { + continue; + } + let (_, index, sigmap) = map.unwrap(); + let mut sig_slice = [0u8; CACHED_SIGNATURE_SIZE]; + sig_slice.clone_from_slice(&sig.as_ref()[*index..*index + CACHED_SIGNATURE_SIZE]); + if let Some(stored_forks) = sigmap.get(&sig_slice) { + let res = stored_forks + .iter() + .filter(|(f, _)| ancestors.get(f).is_some() || self.roots.get(f).is_some()) + .nth(0) + .cloned(); + if res.is_some() { + return res; + } + } + } + None } /// TODO: wallets should send the Transactions recent blockhash as well @@ -55,7 +82,12 @@ impl StatusCache { ancestors: &HashMap, ) -> Option<(usize, T)> { trace!("get_signature_status_slow"); - for blockhash in self.cache.keys() { + let mut keys = vec![]; + for cache in self.cache.iter() { + let mut val: Vec<_> = cache.iter().map(|(k, _)| *k).collect(); + keys.append(&mut val); + } + for blockhash in keys.iter() { trace!("get_signature_status_slow: trying {}", blockhash); if let Some((forkid, res)) = self.get_signature_status(sig, blockhash, ancestors) { trace!("get_signature_status_slow: got {}", forkid); @@ -75,31 +107,80 @@ impl StatusCache { if self.roots.len() > MAX_CACHE_ENTRIES { if let Some(min) = self.roots.iter().min().cloned() { self.roots.remove(&min); - self.cache.retain(|_, (fork, _, _)| *fork > min); + for cache in self.cache.iter_mut() { + cache.retain(|_, (fork, _, _)| *fork > min); + } } } } /// Insert a new signature for a specific fork. pub fn insert(&mut self, transaction_blockhash: &Hash, sig: &Signature, fork: ForkId, res: T) { - let index: usize = - thread_rng().gen_range(0, std::mem::size_of::() - CACHED_SIGNATURE_SIZE); - let sig_map = - self.cache - .entry(*transaction_blockhash) - .or_insert((fork, index, HashMap::new())); + let sig_index: usize; + if let Some(sig_map) = self.cache[0].get(transaction_blockhash) { + sig_index = sig_map.1; + } else { + sig_index = + thread_rng().gen_range(0, std::mem::size_of::() - CACHED_SIGNATURE_SIZE); + } + let sig_map = self.cache[1].entry(*transaction_blockhash).or_insert(( + fork, + sig_index, + HashMap::new(), + )); sig_map.0 = std::cmp::max(fork, sig_map.0); let index = sig_map.1; let mut sig_slice = [0u8; CACHED_SIGNATURE_SIZE]; sig_slice.clone_from_slice(&sig.as_ref()[index..index + CACHED_SIGNATURE_SIZE]); - let sig_forks = sig_map.2.entry(sig_slice).or_insert(vec![]); + let sig_forks = sig_map.2.entry(sig_slice).or_insert_with(|| vec![]); sig_forks.push((fork, res)); } + fn insert_entry( + &mut self, + transaction_blockhash: &Hash, + sig_slice: &[u8; CACHED_SIGNATURE_SIZE], + status: Vec<(ForkId, T)>, + index: usize, + ) { + let fork = status + .iter() + .fold(0, |acc, (f, _)| if acc > *f { acc } else { *f }); + let sig_map = + self.cache[0] + .entry(*transaction_blockhash) + .or_insert((fork, index, HashMap::new())); + sig_map.0 = std::cmp::max(fork, sig_map.0); + let sig_forks = sig_map.2.entry(*sig_slice).or_insert_with(|| vec![]); + sig_forks.extend(status); + } + /// Clear for testing pub fn clear_signatures(&mut self) { - for v in self.cache.values_mut() { - v.2 = HashMap::new(); + for cache in self.cache.iter_mut() { + for v in cache.values_mut() { + v.2 = HashMap::new(); + } + } + } + + pub fn append(&mut self, status_cache: &StatusCache) { + for (hash, sigmap) in status_cache.cache[1].iter() { + for (signature, fork_status) in sigmap.2.iter() { + self.insert_entry(hash, signature, fork_status.clone(), sigmap.1); + } + } + + self.roots = self.roots.union(&status_cache.roots).cloned().collect(); + } + + pub fn merge_caches(&mut self) { + let mut cache = HashMap::new(); + std::mem::swap(&mut cache, &mut self.cache[1]); + for (hash, sigmap) in cache.iter() { + for (signature, fork_status) in sigmap.2.iter() { + self.insert_entry(hash, signature, fork_status.clone(), sigmap.1); + } } } } @@ -107,7 +188,9 @@ impl StatusCache { #[cfg(test)] mod tests { use super::*; + use bincode::{deserialize_from, serialize_into, serialized_size}; use solana_sdk::hash::hash; + use std::io::Cursor; type BankStatusCache = StatusCache<()>; @@ -260,9 +343,91 @@ mod tests { let blockhash = hash(Hash::default().as_ref()); status_cache.clear_signatures(); status_cache.insert(&blockhash, &sig, 0, ()); - let (_, index, sig_map) = status_cache.cache.get(&blockhash).unwrap(); + let (_, index, sig_map) = status_cache.cache[1].get(&blockhash).unwrap(); let mut sig_slice = [0u8; CACHED_SIGNATURE_SIZE]; sig_slice.clone_from_slice(&sig.as_ref()[*index..*index + CACHED_SIGNATURE_SIZE]); assert!(sig_map.get(&sig_slice).is_some()); } + + #[test] + fn test_statuscache_append() { + let sig = Signature::default(); + let mut status_cache0 = BankStatusCache::default(); + let blockhash0 = hash(Hash::new(&vec![0; 32]).as_ref()); + status_cache0.add_root(0); + status_cache0.insert(&blockhash0, &sig, 0, ()); + + let sig = Signature::default(); + let mut status_cache1 = BankStatusCache::default(); + let blockhash1 = hash(Hash::new(&vec![1; 32]).as_ref()); + status_cache1.insert(&blockhash0, &sig, 1, ()); + status_cache1.add_root(1); + status_cache1.insert(&blockhash1, &sig, 1, ()); + + status_cache0.append(&status_cache1); + let roots: HashSet<_> = [0, 1].iter().cloned().collect(); + assert_eq!(status_cache0.roots, roots); + let ancestors = vec![(0, 1), (1, 1)].into_iter().collect(); + assert!(status_cache0 + .get_signature_status(&sig, &blockhash0, &ancestors) + .is_some()); + assert!(status_cache0 + .get_signature_status(&sig, &blockhash1, &ancestors) + .is_some()); + } + + fn test_serialize(sc: &mut BankStatusCache, blockhash: Vec, sig: &Signature) { + let len = serialized_size(&sc).unwrap(); + let mut buf = vec![0u8; len as usize]; + let mut writer = Cursor::new(&mut buf[..]); + let cache0 = sc.cache[0].clone(); + serialize_into(&mut writer, sc).unwrap(); + for hash in blockhash.iter() { + if let Some(map0) = sc.cache[0].get(hash) { + if let Some(map1) = sc.cache[1].get(hash) { + assert_eq!(map0.1, map1.1); + } + } + } + sc.merge_caches(); + let len = writer.position() as usize; + + let mut reader = Cursor::new(&mut buf[..len]); + let mut status_cache: BankStatusCache = deserialize_from(&mut reader).unwrap(); + status_cache.cache[0] = cache0; + status_cache.merge_caches(); + assert!(status_cache.cache[0].len() > 0); + assert!(status_cache.cache[1].is_empty()); + let ancestors = vec![(0, 1), (1, 1)].into_iter().collect(); + assert_eq!(*sc, status_cache); + for hash in blockhash.iter() { + assert!(status_cache + .get_signature_status(&sig, &hash, &ancestors) + .is_some()); + } + } + + #[test] + fn test_statuscache_serialize() { + let sig = Signature::default(); + let mut status_cache = BankStatusCache::default(); + let blockhash0 = hash(Hash::new(&vec![0; 32]).as_ref()); + status_cache.add_root(0); + status_cache.clear_signatures(); + status_cache.insert(&blockhash0, &sig, 0, ()); + test_serialize(&mut status_cache, vec![blockhash0], &sig); + + status_cache.insert(&blockhash0, &sig, 1, ()); + test_serialize(&mut status_cache, vec![blockhash0], &sig); + + let blockhash1 = hash(Hash::new(&vec![1; 32]).as_ref()); + status_cache.insert(&blockhash1, &sig, 1, ()); + test_serialize(&mut status_cache, vec![blockhash0, blockhash1], &sig); + + let blockhash2 = hash(Hash::new(&vec![2; 32]).as_ref()); + let ancestors = vec![(0, 1), (1, 1)].into_iter().collect(); + assert!(status_cache + .get_signature_status(&sig, &blockhash2, &ancestors) + .is_none()); + } } diff --git a/validator/src/main.rs b/validator/src/main.rs index 00751306a9..01ac652436 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -156,7 +156,14 @@ fn main() { .validator(port_range_validator) .help("Range to use for dynamically assigned ports"), ) - .get_matches(); + .arg( + clap::Arg::with_name("snapshot_path") + .long("snapshot-path") + .value_name("PATHS") + .takes_value(true) + .help("Snapshot path"), + ) + .get_matches(); let mut validator_config = ValidatorConfig::default(); let keypair = if let Some(identity) = matches.value_of("identity") { @@ -220,6 +227,11 @@ fn main() { } else { validator_config.account_paths = None; } + if let Some(paths) = matches.value_of("snapshot_path") { + validator_config.snapshot_path = Some(paths.to_string()); + } else { + validator_config.snapshot_path = None; + } let cluster_entrypoint = matches.value_of("entrypoint").map(|entrypoint| { let entrypoint_addr = solana_netutil::parse_host_port(entrypoint) .expect("failed to parse entrypoint address");