From abf2b300dad48d9cafa552a72c1aec71c5fe57a0 Mon Sep 17 00:00:00 2001 From: Sathish <44555499+sambley@users.noreply.github.com> Date: Thu, 9 May 2019 19:27:06 -0700 Subject: [PATCH] Create bank snapshots (#3671) * Be able to create bank snapshots * fix clippy * load snapshot on start * regenerate account index from the storage * Remove rc feature dependency * cleanup * save snapshot for slot 0 --- core/src/bank_forks.rs | 265 +++++++++++++++++++++++- core/src/blocktree.rs | 2 +- core/src/cluster_info.rs | 3 +- core/src/crds_gossip.rs | 2 +- core/src/crds_gossip_pull.rs | 2 +- core/src/crds_gossip_push.rs | 2 +- core/src/fullnode.rs | 48 ++++- core/src/locktower.rs | 2 +- core/src/replay_stage.rs | 6 +- core/src/staking_utils.rs | 6 +- core/tests/cluster_info.rs | 2 +- core/tests/crds_gossip.rs | 2 +- core/tests/tvu.rs | 2 +- fullnode/src/main.rs | 10 +- run.sh | 25 ++- runtime/src/accounts.rs | 85 ++++++-- runtime/src/accounts_db.rs | 337 ++++++++++++++++++++++++++----- runtime/src/accounts_index.rs | 22 +- runtime/src/append_vec.rs | 109 +++++++++- runtime/src/bank.rs | 220 ++++++++++++++++---- runtime/src/blockhash_queue.rs | 7 +- runtime/src/bloom.rs | 10 +- runtime/src/lib.rs | 4 +- runtime/src/message_processor.rs | 4 + runtime/src/serde_utils.rs | 62 ++++++ runtime/src/status_cache.rs | 33 ++- 26 files changed, 1130 insertions(+), 142 deletions(-) create mode 100644 runtime/src/serde_utils.rs diff --git a/core/src/bank_forks.rs b/core/src/bank_forks.rs index ee643b7e7c..58852db224 100644 --- a/core/src/bank_forks.rs +++ b/core/src/bank_forks.rs @@ -1,10 +1,16 @@ //! The `bank_forks` module implments BankForks a DAG of checkpointed Banks -use hashbrown::{HashMap, HashSet}; +use bincode::{deserialize_from, serialize_into}; use solana_metrics::counter::Counter; -use solana_runtime::bank::Bank; +use solana_runtime::bank::{Bank, BankRc}; use solana_sdk::timing; +use std::collections::{HashMap, HashSet}; +use std::env; +use std::fs; +use std::fs::File; +use std::io::{BufReader, BufWriter, Error, ErrorKind}; use std::ops::Index; +use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Instant; @@ -12,6 +18,8 @@ pub struct BankForks { banks: HashMap>, working_bank: Arc, root: u64, + slots: HashSet, + use_snapshot: bool, } impl Index for BankForks { @@ -30,6 +38,8 @@ impl BankForks { banks, working_bank, root: 0, + slots: HashSet::new(), + use_snapshot: false, } } @@ -45,6 +55,7 @@ impl BankForks { } /// Create a map of bank slot id to the set of all of its descendants + #[allow(clippy::or_fun_call)] pub fn descendants(&self) -> HashMap> { let mut descendants = HashMap::new(); for bank in self.banks.values() { @@ -91,6 +102,8 @@ impl BankForks { root, banks, working_bank, + slots: HashSet::new(), + use_snapshot: false, } } @@ -128,9 +141,162 @@ impl BankForks { } fn prune_non_root(&mut self, root: u64) { + let slots: HashSet = self + .banks + .iter() + .filter(|(_, b)| b.is_frozen()) + .map(|(k, _)| *k) + .collect(); let descendants = self.descendants(); self.banks - .retain(|slot, _| descendants[&root].contains(slot)) + .retain(|slot, _| descendants[&root].contains(slot)); + if self.use_snapshot { + let diff: HashSet<_> = slots.symmetric_difference(&self.slots).collect(); + trace!("prune non root {} - {:?}", root, diff); + for slot in diff.iter() { + if **slot > root { + let _ = self.add_snapshot(**slot, root); + } else if **slot > 0 { + self.remove_snapshot(**slot); + } + } + } + self.slots = slots.clone(); + } + + fn get_io_error(error: &str) -> Error { + Error::new(ErrorKind::Other, error) + } + + fn get_snapshot_path() -> PathBuf { + let out_dir = env::var("OUT_DIR").unwrap_or_else(|_| "target".to_string()); + let snapshot_dir = format!("{}/snapshots/", out_dir); + Path::new(&snapshot_dir).to_path_buf() + } + + pub fn add_snapshot(&self, slot: u64, root: u64) -> Result<(), Error> { + let path = BankForks::get_snapshot_path(); + fs::create_dir_all(path.clone())?; + let bank_file = format!("{}", slot); + let bank_file_path = path.join(bank_file); + trace!("path: {:?}", bank_file_path); + let file = File::create(bank_file_path)?; + let mut stream = BufWriter::new(file); + let bank = self.get(slot).unwrap().clone(); + serialize_into(&mut stream, &*bank) + .map_err(|_| BankForks::get_io_error("serialize bank error"))?; + let mut parent_slot: u64 = 0; + if let Some(parent_bank) = bank.parent() { + parent_slot = parent_bank.slot(); + } + serialize_into(&mut stream, &parent_slot) + .map_err(|_| BankForks::get_io_error("serialize bank parent error"))?; + serialize_into(&mut stream, &bank.rc) + .map_err(|_| BankForks::get_io_error("serialize bank rc error"))?; + serialize_into(&mut stream, &root) + .map_err(|_| BankForks::get_io_error("serialize root error"))?; + Ok(()) + } + + pub fn remove_snapshot(&self, slot: u64) { + let path = BankForks::get_snapshot_path(); + let bank_file = format!("{}", slot); + let bank_file_path = path.join(bank_file); + let _ = fs::remove_file(bank_file_path); + } + + pub fn set_snapshot_config(&mut self, use_snapshot: bool) { + self.use_snapshot = use_snapshot; + } + + fn setup_banks( + bank_maps: &mut Vec<(u64, u64, Bank)>, + bank_rc: &BankRc, + ) -> (HashMap>, HashSet, u64) { + let mut banks = HashMap::new(); + let mut slots = HashSet::new(); + let (last_slot, last_parent_slot, mut last_bank) = bank_maps.remove(0); + last_bank.set_bank_rc(&bank_rc); + + while let Some((slot, parent_slot, mut bank)) = bank_maps.pop() { + bank.set_bank_rc(&bank_rc); + if parent_slot != 0 { + if let Some(parent) = banks.get(&parent_slot) { + bank.set_parent(parent); + } + } + banks.insert(slot, Arc::new(bank)); + slots.insert(slot); + } + if last_parent_slot != 0 { + if let Some(parent) = banks.get(&last_parent_slot) { + last_bank.set_parent(parent); + } + } + banks.insert(last_slot, Arc::new(last_bank)); + slots.insert(last_slot); + + (banks, slots, last_slot) + } + + pub fn load_from_snapshot() -> Result { + let path = BankForks::get_snapshot_path(); + let paths = fs::read_dir(path.clone())?; + let mut names = paths + .filter_map(|entry| { + entry.ok().and_then(|e| { + e.path() + .file_name() + .and_then(|n| n.to_str().map(|s| s.parse::().unwrap())) + }) + }) + .collect::>(); + + names.sort(); + let mut bank_maps = vec![]; + let mut bank_rc: Option = None; + let mut root: u64 = 0; + for bank_slot in names.iter().rev() { + let bank_path = format!("{}", bank_slot); + let bank_file_path = path.join(bank_path.clone()); + info!("Load from {:?}", bank_file_path); + let file = File::open(bank_file_path)?; + let mut stream = BufReader::new(file); + let bank: Result = deserialize_from(&mut stream) + .map_err(|_| BankForks::get_io_error("deserialize bank error")); + let slot: Result = deserialize_from(&mut stream) + .map_err(|_| BankForks::get_io_error("deserialize bank parent error")); + let parent_slot = if slot.is_ok() { slot.unwrap() } else { 0 }; + if bank_rc.is_none() { + let rc: Result = deserialize_from(&mut stream) + .map_err(|_| BankForks::get_io_error("deserialize bank rc error")); + if rc.is_ok() { + bank_rc = Some(rc.unwrap()); + let r: Result = deserialize_from(&mut stream) + .map_err(|_| BankForks::get_io_error("deserialize root error")); + if r.is_ok() { + root = r.unwrap(); + } + } + } + match bank { + Ok(v) => bank_maps.push((*bank_slot, parent_slot, v)), + Err(_) => warn!("Load snapshot failed for {}", bank_slot), + } + } + if bank_maps.is_empty() || bank_rc.is_none() { + return Err(Error::new(ErrorKind::Other, "no snapshots loaded")); + } + + let (banks, slots, last_slot) = BankForks::setup_banks(&mut bank_maps, &bank_rc.unwrap()); + let working_bank = banks[&last_slot].clone(); + Ok(BankForks { + banks, + working_bank, + root, + slots, + use_snapshot: true, + }) } } @@ -140,6 +306,10 @@ mod tests { use crate::genesis_utils::create_genesis_block; use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; + use solana_sdk::signature::{Keypair, KeypairUtil}; + use solana_sdk::system_transaction; + use std::env; + use std::fs::remove_dir_all; #[test] fn test_bank_forks() { @@ -164,8 +334,8 @@ mod tests { let bank = Bank::new_from_parent(&bank0, &Pubkey::default(), 2); bank_forks.insert(bank); let descendants = bank_forks.descendants(); - let children: Vec = descendants[&0].iter().cloned().collect(); - assert_eq!(children, vec![1, 2]); + let children: HashSet = [1u64, 2u64].to_vec().into_iter().collect(); + assert_eq!(children, *descendants.get(&0).unwrap()); assert!(descendants[&1].is_empty()); assert!(descendants[&2].is_empty()); } @@ -209,4 +379,89 @@ mod tests { assert_eq!(bank_forks.active_banks(), vec![1]); } + struct TempPaths { + pub paths: String, + } + + #[macro_export] + macro_rules! tmp_bank_accounts_name { + () => { + &format!("{}-{}", file!(), line!()) + }; + } + + #[macro_export] + macro_rules! get_tmp_bank_accounts_path { + () => { + get_tmp_bank_accounts_path(tmp_bank_accounts_name!()) + }; + } + + impl Drop for TempPaths { + fn drop(&mut self) { + let paths: Vec = self.paths.split(',').map(|s| s.to_string()).collect(); + paths.iter().for_each(|p| { + let _ignored = remove_dir_all(p); + }); + } + } + + fn get_paths_vec(paths: &str) -> Vec { + paths.split(',').map(|s| s.to_string()).collect() + } + + fn get_tmp_bank_accounts_path(paths: &str) -> TempPaths { + let vpaths = get_paths_vec(paths); + let out_dir = env::var("OUT_DIR").unwrap_or_else(|_| "target".to_string()); + let vpaths: Vec<_> = vpaths + .iter() + .map(|path| format!("{}/{}", out_dir, path)) + .collect(); + TempPaths { + paths: vpaths.join(","), + } + } + + fn save_and_load_snapshot(bank_forks: &BankForks) { + for (slot, _) in bank_forks.banks.iter() { + bank_forks.add_snapshot(*slot, 0).unwrap(); + } + + let new = BankForks::load_from_snapshot().unwrap(); + for (slot, _) in bank_forks.banks.iter() { + let bank = bank_forks.banks.get(slot).unwrap().clone(); + let new_bank = new.banks.get(slot).unwrap(); + bank.compare_bank(&new_bank); + } + for (slot, _) in new.banks.iter() { + new.remove_snapshot(*slot); + } + } + + #[test] + fn test_bank_forks_snapshot_n() { + solana_logger::setup(); + let path = get_tmp_bank_accounts_path!(); + let (genesis_block, mint_keypair) = create_genesis_block(10_000); + let bank0 = Bank::new_with_paths(&genesis_block, Some(path.paths.clone())); + bank0.freeze(); + let mut bank_forks = BankForks::new(0, bank0); + bank_forks.set_snapshot_config(true); + for index in 0..10 { + let bank = Bank::new_from_parent(&bank_forks[index], &Pubkey::default(), index + 1); + let key1 = Keypair::new().pubkey(); + let tx = system_transaction::create_user_account( + &mint_keypair, + &key1, + 1, + genesis_block.hash(), + 0, + ); + assert_eq!(bank.process_transaction(&tx), Ok(())); + bank.freeze(); + bank_forks.insert(bank); + save_and_load_snapshot(&bank_forks); + } + assert_eq!(bank_forks.working_bank().slot(), 10); + } } diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index 81e04c0d1b..01204cad58 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -11,7 +11,7 @@ use solana_kvstore as kvstore; use bincode::deserialize; -use hashbrown::HashMap; +use std::collections::HashMap; #[cfg(not(feature = "kvstore"))] use rocksdb; diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 4956d239f4..c5869b957b 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -26,7 +26,6 @@ use crate::staking_utils; use crate::streamer::{BlobReceiver, BlobSender}; use bincode::{deserialize, serialize}; use core::cmp; -use hashbrown::HashMap; use rand::{thread_rng, Rng}; use rayon::prelude::*; use solana_metrics::counter::Counter; @@ -41,7 +40,7 @@ use solana_sdk::signature::{Keypair, KeypairUtil, Signable, Signature}; use solana_sdk::timing::{duration_as_ms, timestamp}; use solana_sdk::transaction::Transaction; use std::cmp::min; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::fmt; use std::io; use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; diff --git a/core/src/crds_gossip.rs b/core/src/crds_gossip.rs index 665bbd5658..57dd90134b 100644 --- a/core/src/crds_gossip.rs +++ b/core/src/crds_gossip.rs @@ -8,10 +8,10 @@ use crate::crds_gossip_error::CrdsGossipError; use crate::crds_gossip_pull::CrdsGossipPull; use crate::crds_gossip_push::{CrdsGossipPush, CRDS_GOSSIP_NUM_ACTIVE}; use crate::crds_value::CrdsValue; -use hashbrown::HashMap; use solana_runtime::bloom::Bloom; use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; +use std::collections::HashMap; ///The min size for bloom filters pub const CRDS_GOSSIP_BLOOM_SIZE: usize = 1000; diff --git a/core/src/crds_gossip_pull.rs b/core/src/crds_gossip_pull.rs index 73584391bb..44b5936c07 100644 --- a/core/src/crds_gossip_pull.rs +++ b/core/src/crds_gossip_pull.rs @@ -16,13 +16,13 @@ use crate::crds_gossip_error::CrdsGossipError; use crate::crds_value::{CrdsValue, CrdsValueLabel}; use crate::packet::BLOB_DATA_SIZE; use bincode::serialized_size; -use hashbrown::HashMap; use rand; use rand::distributions::{Distribution, WeightedIndex}; use solana_runtime::bloom::Bloom; use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; use std::cmp; +use std::collections::HashMap; use std::collections::VecDeque; pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000; diff --git a/core/src/crds_gossip_push.rs b/core/src/crds_gossip_push.rs index c8ca02971d..e8acd98ab7 100644 --- a/core/src/crds_gossip_push.rs +++ b/core/src/crds_gossip_push.rs @@ -15,7 +15,6 @@ use crate::crds_gossip_error::CrdsGossipError; use crate::crds_value::{CrdsValue, CrdsValueLabel}; use crate::packet::BLOB_DATA_SIZE; use bincode::serialized_size; -use hashbrown::HashMap; use indexmap::map::IndexMap; use rand; use rand::distributions::{Distribution, WeightedIndex}; @@ -25,6 +24,7 @@ use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::timestamp; use std::cmp; +use std::collections::HashMap; pub const CRDS_GOSSIP_NUM_ACTIVE: usize = 30; pub const CRDS_GOSSIP_PUSH_FANOUT: usize = 6; diff --git a/core/src/fullnode.rs b/core/src/fullnode.rs index b291f33d59..76181f231d 100644 --- a/core/src/fullnode.rs +++ b/core/src/fullnode.rs @@ -45,6 +45,7 @@ pub struct FullnodeConfig { pub tick_config: PohServiceConfig, pub account_paths: Option, pub rpc_config: JsonRpcConfig, + pub use_snapshot: bool, } impl Default for FullnodeConfig { fn default() -> Self { @@ -60,6 +61,7 @@ impl Default for FullnodeConfig { tick_config: PohServiceConfig::default(), account_paths: None, rpc_config: JsonRpcConfig::default(), + use_snapshot: false, } } } @@ -102,7 +104,11 @@ impl Fullnode { ledger_signal_receiver, completed_slots_receiver, leader_schedule_cache, - ) = new_banks_from_blocktree(ledger_path, config.account_paths.clone()); + ) = new_banks_from_blocktree( + ledger_path, + config.account_paths.clone(), + config.use_snapshot, + ); let leader_schedule_cache = Arc::new(leader_schedule_cache); let exit = Arc::new(AtomicBool::new(false)); @@ -117,7 +123,7 @@ impl Fullnode { ); let blocktree = Arc::new(blocktree); - let (poh_recorder, entry_receiver) = PohRecorder::new_with_clear_signal( + let (mut poh_recorder, entry_receiver) = PohRecorder::new_with_clear_signal( bank.tick_height(), bank.last_blockhash(), bank.slot(), @@ -128,6 +134,10 @@ impl Fullnode { blocktree.new_blobs_signals.first().cloned(), &leader_schedule_cache, ); + if config.use_snapshot { + poh_recorder.set_bank(&bank); + } + let poh_recorder = Arc::new(Mutex::new(poh_recorder)); let poh_service = PohService::new(poh_recorder.clone(), &config.tick_config, &exit); assert_eq!( @@ -289,9 +299,40 @@ impl Fullnode { } } +fn get_bank_forks( + genesis_block: &GenesisBlock, + blocktree: &Blocktree, + account_paths: Option, + use_snapshot: bool, +) -> (BankForks, Vec, LeaderScheduleCache) { + if use_snapshot { + let bank_forks = BankForks::load_from_snapshot(); + match bank_forks { + Ok(v) => { + let bank = &v.working_bank(); + let fork_info = BankForksInfo { + bank_slot: bank.slot(), + entry_height: bank.tick_height(), + }; + return (v, vec![fork_info], LeaderScheduleCache::new_from_bank(bank)); + } + Err(_) => warn!("Failed to load from snapshot, fallback to load from ledger"), + } + } + let (mut bank_forks, bank_forks_info, leader_schedule_cache) = + blocktree_processor::process_blocktree(&genesis_block, &blocktree, account_paths) + .expect("process_blocktree failed"); + if use_snapshot { + bank_forks.set_snapshot_config(true); + let _ = bank_forks.add_snapshot(0, 0); + } + (bank_forks, bank_forks_info, leader_schedule_cache) +} + pub fn new_banks_from_blocktree( blocktree_path: &str, account_paths: Option, + use_snapshot: bool, ) -> ( BankForks, Vec, @@ -308,8 +349,7 @@ pub fn new_banks_from_blocktree( .expect("Expected to successfully open database ledger"); let (bank_forks, bank_forks_info, leader_schedule_cache) = - blocktree_processor::process_blocktree(&genesis_block, &blocktree, account_paths) - .expect("process_blocktree failed"); + get_bank_forks(&genesis_block, &blocktree, account_paths, use_snapshot); ( bank_forks, diff --git a/core/src/locktower.rs b/core/src/locktower.rs index 083aa59579..20f063428c 100644 --- a/core/src/locktower.rs +++ b/core/src/locktower.rs @@ -1,11 +1,11 @@ use crate::bank_forks::BankForks; use crate::staking_utils; -use hashbrown::{HashMap, HashSet}; use solana_metrics::influxdb; use solana_runtime::bank::Bank; use solana_sdk::account::Account; use solana_sdk::pubkey::Pubkey; use solana_vote_api::vote_state::{Lockout, Vote, VoteState, MAX_LOCKOUT_HISTORY}; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; pub const VOTE_THRESHOLD_DEPTH: usize = 8; diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index ed9c963ba0..d5153f7c6f 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -13,7 +13,6 @@ use crate::poh_recorder::PohRecorder; use crate::result::{Error, Result}; use crate::rpc_subscriptions::RpcSubscriptions; use crate::service::Service; -use hashbrown::HashMap; use solana_metrics::counter::Counter; use solana_metrics::influxdb; use solana_runtime::bank::Bank; @@ -23,6 +22,7 @@ use solana_sdk::signature::KeypairUtil; use solana_sdk::timing::{self, duration_as_ms}; use solana_sdk::transaction::Transaction; use solana_vote_api::vote_instruction; +use std::collections::HashMap; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender}; use std::sync::{Arc, Mutex, RwLock}; @@ -501,7 +501,7 @@ impl ReplayStage { let bank_slot = bank.slot(); let bank_progress = &mut progress .entry(bank_slot) - .or_insert(ForkProgress::new(bank.last_blockhash())); + .or_insert_with(|| ForkProgress::new(bank.last_blockhash())); blocktree.get_slot_entries_with_blob_count(bank_slot, bank_progress.num_blobs as u64, None) } @@ -513,7 +513,7 @@ impl ReplayStage { ) -> Result<()> { let bank_progress = &mut progress .entry(bank.slot()) - .or_insert(ForkProgress::new(bank.last_blockhash())); + .or_insert_with(|| ForkProgress::new(bank.last_blockhash())); let result = Self::verify_and_process_entries(&bank, &entries, &bank_progress.last_entry); bank_progress.num_blobs += num; if let Some(last_entry) = entries.last() { diff --git a/core/src/staking_utils.rs b/core/src/staking_utils.rs index 17416c396c..1d3700aa83 100644 --- a/core/src/staking_utils.rs +++ b/core/src/staking_utils.rs @@ -1,9 +1,9 @@ -use hashbrown::HashMap; use solana_runtime::bank::Bank; use solana_sdk::account::Account; use solana_sdk::pubkey::Pubkey; use solana_vote_api::vote_state::VoteState; use std::borrow::Borrow; +use std::collections::HashMap; /// Looks through vote accounts, and finds the latest slot that has achieved /// supermajority lockout @@ -66,7 +66,7 @@ pub fn node_staked_accounts_at_epoch( ) -> Option> { bank.epoch_vote_accounts(epoch_height).map(|epoch_state| { epoch_state - .into_iter() + .iter() .filter_map(|(account_id, account)| { filter_zero_balances(account).map(|stake| (account_id, stake, account)) }) @@ -152,9 +152,9 @@ pub mod tests { create_genesis_block, create_genesis_block_with_leader, BOOTSTRAP_LEADER_LAMPORTS, }; use crate::voting_keypair::tests as voting_keypair_tests; - use hashbrown::HashSet; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil}; + use std::collections::HashSet; use std::iter::FromIterator; use std::sync::Arc; diff --git a/core/tests/cluster_info.rs b/core/tests/cluster_info.rs index a9f958240d..377dca66a1 100644 --- a/core/tests/cluster_info.rs +++ b/core/tests/cluster_info.rs @@ -1,9 +1,9 @@ -use hashbrown::{HashMap, HashSet}; use rayon::iter::{IntoParallelIterator, ParallelIterator}; use rayon::prelude::*; use solana::cluster_info::{compute_retransmit_peers, ClusterInfo}; use solana::contact_info::ContactInfo; use solana_sdk::pubkey::Pubkey; +use std::collections::{HashMap, HashSet}; use std::sync::mpsc::channel; use std::sync::mpsc::TryRecvError; use std::sync::mpsc::{Receiver, Sender}; diff --git a/core/tests/crds_gossip.rs b/core/tests/crds_gossip.rs index e056e02d3a..e1b85d8103 100644 --- a/core/tests/crds_gossip.rs +++ b/core/tests/crds_gossip.rs @@ -1,5 +1,4 @@ use bincode::serialized_size; -use hashbrown::HashMap; use log::*; use rayon::prelude::*; use solana::contact_info::ContactInfo; @@ -11,6 +10,7 @@ use solana::crds_value::CrdsValueLabel; use solana_sdk::hash::hash; use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::timestamp; +use std::collections::HashMap; use std::sync::{Arc, Mutex}; type Node = Arc>; diff --git a/core/tests/tvu.rs b/core/tests/tvu.rs index 9f220ab681..b52f94d728 100644 --- a/core/tests/tvu.rs +++ b/core/tests/tvu.rs @@ -90,7 +90,7 @@ fn test_replay() { ledger_signal_receiver, completed_slots_receiver, leader_schedule_cache, - ) = fullnode::new_banks_from_blocktree(&blocktree_path, None); + ) = fullnode::new_banks_from_blocktree(&blocktree_path, None, false); let working_bank = bank_forks.working_bank(); assert_eq!( working_bank.get_balance(&mint_keypair.pubkey()), diff --git a/fullnode/src/main.rs b/fullnode/src/main.rs index 5f72105d63..6a19091bf5 100644 --- a/fullnode/src/main.rs +++ b/fullnode/src/main.rs @@ -148,7 +148,13 @@ fn main() { .validator(port_range_validator) .help("Range to use for dynamically assigned ports"), ) - .get_matches(); + .arg( + clap::Arg::with_name("use_snapshot") + .long("use-snapshot") + .takes_value(false) + .help("Load / Store bank snapshots"), + ) + .get_matches(); let mut fullnode_config = FullnodeConfig::default(); let keypair = if let Some(identity) = matches.value_of("identity") { @@ -178,6 +184,8 @@ fn main() { fullnode_config.sigverify_disabled = matches.is_present("no_sigverify"); + fullnode_config.use_snapshot = matches.is_present("use_snapshot"); + fullnode_config.voting_disabled = matches.is_present("no_voting"); if matches.is_present("enable_rpc_exit") { diff --git a/run.sh b/run.sh index 412bf98a42..91970e0432 100755 --- a/run.sh +++ b/run.sh @@ -39,9 +39,27 @@ export RUST_BACKTRACE=1 dataDir=$PWD/target/"$(basename "$0" .sh)" set -x -solana-keygen -o "$dataDir"/config/leader-keypair.json -solana-keygen -o "$dataDir"/config/leader-vote-account-keypair.json -solana-keygen -o "$dataDir"/config/leader-stake-account-keypair.json +leader_keypair="$dataDir/config/leader-keypair.json" +if [ -e "$leader_keypair" ] +then + echo "Use existing leader keypair" +else + solana-keygen -o "$leader_keypair" +fi +leader_vote_account_keypair="$dataDir/config/leader-vote-account-keypair.json" +if [ -e "$leader_vote_account_keypair" ] +then + echo "Use existing leader vote account keypair" +else + solana-keygen -o "$leader_vote_account_keypair" +fi +leader_stake_account_keypair="$dataDir/config/leader-stake-account-keypair.json" +if [ -e "$leader_stake_account_keypair" ] +then + echo "Use existing leader stake account keypair" +else + solana-keygen -o "$leader_stake_account_keypair" +fi solana-keygen -o "$dataDir"/config/drone-keypair.json leaderVoteAccountPubkey=$(\ @@ -70,6 +88,7 @@ args=( --ledger "$dataDir"/ledger/ --rpc-port 8899 --rpc-drone-address 127.0.0.1:9900 + --use-snapshot ) if [[ -n $blockstreamSocket ]]; then args+=(--blockstream "$blockstreamSocket") diff --git a/runtime/src/accounts.rs b/runtime/src/accounts.rs index ad8043505d..82008f2bb2 100644 --- a/runtime/src/accounts.rs +++ b/runtime/src/accounts.rs @@ -1,13 +1,13 @@ use crate::accounts_db::{ - get_paths_vec, AccountInfo, AccountStorage, AccountsDB, ErrorCounters, InstructionAccounts, - InstructionLoaders, + get_paths_vec, AccountInfo, AccountStorage, AccountsDB, AppendVecId, ErrorCounters, + InstructionAccounts, InstructionLoaders, }; use crate::accounts_index::{AccountsIndex, Fork}; use crate::append_vec::StoredAccount; use crate::message_processor::has_duplicates; use bincode::serialize; -use hashbrown::{HashMap, HashSet}; use log::*; +use serde::{Deserialize, Serialize}; use solana_metrics::counter::Counter; use solana_sdk::account::Account; use solana_sdk::fee_calculator::FeeCalculator; @@ -18,6 +18,7 @@ use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::transaction::Result; use solana_sdk::transaction::{Transaction, TransactionError}; use std::borrow::Borrow; +use std::collections::{HashMap, HashSet}; use std::env; use std::fs::remove_dir_all; use std::iter::once; @@ -50,15 +51,18 @@ type RecordLocks = ( ); /// This structure handles synchronization for db -#[derive(Default)] +#[derive(Default, Debug, Serialize, Deserialize)] pub struct Accounts { /// Single global AccountsDB + #[serde(skip)] pub accounts_db: Arc, /// set of accounts which are currently in the pipeline + #[serde(skip)] account_locks: AccountLocks, /// set of accounts which are about to record + commit + #[serde(skip)] record_locks: Mutex, /// List of persistent stores @@ -71,16 +75,17 @@ pub struct Accounts { impl Drop for Accounts { fn drop(&mut self) { - let paths = get_paths_vec(&self.paths); - paths.iter().for_each(|p| { - let _ignored = remove_dir_all(p); + if self.own_paths && (Arc::strong_count(&self.accounts_db) == 1) { + let paths = get_paths_vec(&self.paths); + paths.iter().for_each(|p| { + debug!("drop called for {:?}", p); + let _ignored = remove_dir_all(p); - // it is safe to delete the parent - if self.own_paths { + // it is safe to delete the parent let path = Path::new(p); let _ignored = remove_dir_all(path.parent().unwrap()); - } - }); + }); + } } } @@ -324,7 +329,9 @@ impl Accounts { pub fn load_by_program(&self, fork: Fork, program_id: &Pubkey) -> Vec<(Pubkey, Account)> { let accumulator: Vec> = self.accounts_db.scan_account_storage( fork, - |stored_account: &StoredAccount, accum: &mut Vec<(Pubkey, u64, Account)>| { + |stored_account: &StoredAccount, + _id: AppendVecId, + accum: &mut Vec<(Pubkey, u64, Account)>| { if stored_account.balance.owner == *program_id { let val = ( stored_account.meta.pubkey, @@ -435,7 +442,9 @@ impl Accounts { pub fn hash_internal_state(&self, fork_id: Fork) -> Option { let accumulator: Vec> = self.accounts_db.scan_account_storage( fork_id, - |stored_account: &StoredAccount, accum: &mut Vec<(Pubkey, u64, Hash)>| { + |stored_account: &StoredAccount, + _id: AppendVecId, + accum: &mut Vec<(Pubkey, u64, Hash)>| { accum.push(( stored_account.meta.pubkey, stored_account.meta.write_version, @@ -571,11 +580,14 @@ mod tests { // TODO: all the bank tests are bank specific, issue: 2194 use super::*; + use bincode::{deserialize_from, serialize_into, serialized_size}; + use rand::{thread_rng, Rng}; use solana_sdk::account::Account; use solana_sdk::hash::Hash; use solana_sdk::instruction::CompiledInstruction; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::transaction::Transaction; + use std::io::Cursor; use std::thread::{sleep, Builder}; use std::time::{Duration, Instant}; @@ -1110,4 +1122,51 @@ mod tests { assert!(parent_record_locks2.is_empty()); } } + + fn create_accounts(accounts: &Accounts, pubkeys: &mut Vec, num: usize) { + for t in 0..num { + let pubkey = Pubkey::new_rand(); + let account = Account::new((t + 1) as u64, 0, &Account::default().owner); + accounts.store_slow(0, &pubkey, &account); + pubkeys.push(pubkey.clone()); + } + } + + fn check_accounts(accounts: &Accounts, pubkeys: &Vec, num: usize) { + for _ in 1..num { + let idx = thread_rng().gen_range(0, num - 1); + let ancestors = vec![(0, 0)].into_iter().collect(); + let account = accounts.load_slow(&ancestors, &pubkeys[idx]).unwrap(); + let account1 = Account::new((idx + 1) as u64, 0, &Account::default().owner); + assert_eq!(account, (account1, 0)); + } + } + + #[test] + fn test_accounts_serialize() { + solana_logger::setup(); + let accounts = Accounts::new(Some("serialize_accounts".to_string())); + + let mut pubkeys: Vec = vec![]; + create_accounts(&accounts, &mut pubkeys, 100); + check_accounts(&accounts, &pubkeys, 100); + accounts.add_root(0); + + let sz = + serialized_size(&accounts).unwrap() + serialized_size(&*accounts.accounts_db).unwrap(); + let mut buf = vec![0u8; sz as usize]; + let mut writer = Cursor::new(&mut buf[..]); + serialize_into(&mut writer, &accounts).unwrap(); + serialize_into(&mut writer, &*accounts.accounts_db).unwrap(); + + let mut reader = Cursor::new(&mut buf[..]); + let mut daccounts: Accounts = deserialize_from(&mut reader).unwrap(); + let accounts_db: AccountsDB = deserialize_from(&mut reader).unwrap(); + daccounts.accounts_db = Arc::new(accounts_db); + check_accounts(&daccounts, &pubkeys, 100); + assert_eq!( + accounts.hash_internal_state(0), + daccounts.hash_internal_state(0) + ); + } } diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index e3b848b118..a722bdc572 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -20,13 +20,19 @@ use crate::accounts_index::{AccountsIndex, Fork}; use crate::append_vec::{AppendVec, StorageMeta, StoredAccount}; -use hashbrown::{HashMap, HashSet}; +use bincode::{deserialize_from, serialize_into, serialized_size}; use log::*; use rand::{thread_rng, Rng}; use rayon::prelude::*; +use serde::de::{MapAccess, Visitor}; +use serde::ser::{SerializeMap, Serializer}; +use serde::{Deserialize, Serialize}; use solana_sdk::account::Account; use solana_sdk::pubkey::Pubkey; +use std::collections::{HashMap, HashSet}; +use std::fmt; use std::fs::{create_dir_all, remove_dir_all}; +use std::io::Cursor; use std::path::Path; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; @@ -49,7 +55,7 @@ pub struct ErrorCounters { pub missing_signature_for_fee: usize, } -#[derive(Default, Clone)] +#[derive(Deserialize, Serialize, Default, Debug, PartialEq, Clone)] pub struct AccountInfo { /// index identifying the append storage id: AppendVecId, @@ -62,18 +68,67 @@ pub struct AccountInfo { lamports: u64, } /// An offset into the AccountsDB::storage vector -type AppendVecId = usize; -pub type AccountStorage = HashMap>; +pub type AppendVecId = usize; pub type InstructionAccounts = Vec; pub type InstructionLoaders = Vec>; -#[derive(Debug, PartialEq, Clone, Copy)] +#[derive(Default, Debug)] +pub struct AccountStorage(HashMap>); + +struct AccountStorageVisitor; + +impl<'de> Visitor<'de> for AccountStorageVisitor { + type Value = AccountStorage; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("Expecting AccountStorage") + } + + #[allow(clippy::mutex_atomic)] + fn visit_map(self, mut access: M) -> Result + where + M: MapAccess<'de>, + { + let mut map = HashMap::new(); + + while let Some((key, value)) = access.next_entry()? { + map.insert(key, Arc::new(value)); + } + + Ok(AccountStorage(map)) + } +} + +impl Serialize for AccountStorage { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let mut map = serializer.serialize_map(Some(self.0.len()))?; + for (k, v) in &self.0 { + map.serialize_entry(k, &**v)?; + } + map.end() + } +} + +impl<'de> Deserialize<'de> for AccountStorage { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + deserializer.deserialize_map(AccountStorageVisitor) + } +} + +#[derive(Debug, PartialEq, Copy, Clone, Deserialize, Serialize)] pub enum AccountStorageStatus { StorageAvailable = 0, StorageFull = 1, } /// Persistent storage structure holding the accounts +#[derive(Debug, Deserialize, Serialize)] pub struct AccountStorageEntry { id: AppendVecId, @@ -165,7 +220,7 @@ impl AccountStorageEntry { } // This structure handles the load/store of the accounts -#[derive(Default)] +#[derive(Default, Debug)] pub struct AccountsDB { /// Keeps tracks of index into AppendVec on a per fork basis pub accounts_index: RwLock>, @@ -195,7 +250,7 @@ impl AccountsDB { let paths = get_paths_vec(&paths); AccountsDB { accounts_index: RwLock::new(AccountsIndex::default()), - storage: RwLock::new(HashMap::new()), + storage: RwLock::new(AccountStorage(HashMap::new())), next_id: AtomicUsize::new(0), write_version: AtomicUsize::new(0), paths, @@ -217,7 +272,7 @@ impl AccountsDB { } pub fn has_accounts(&self, fork: Fork) -> bool { - for x in self.storage.read().unwrap().values() { + for x in self.storage.read().unwrap().0.values() { if x.fork_id == fork && x.count() > 0 { return true; } @@ -229,7 +284,7 @@ impl AccountsDB { // PERF: Sequentially read each storage entry in parallel pub fn scan_account_storage(&self, fork_id: Fork, scan_func: F) -> Vec where - F: Fn(&StoredAccount, &mut B) -> (), + F: Fn(&StoredAccount, AppendVecId, &mut B) -> (), F: Send + Sync, B: Send + Default, { @@ -237,6 +292,7 @@ impl AccountsDB { .storage .read() .unwrap() + .0 .values() .filter(|store| store.fork_id == fork_id) .cloned() @@ -248,7 +304,7 @@ impl AccountsDB { let mut retval = B::default(); accounts .iter() - .for_each(|stored_account| scan_func(stored_account, &mut retval)); + .for_each(|stored_account| scan_func(stored_account, storage.id, &mut retval)); retval }) .collect() @@ -263,6 +319,7 @@ impl AccountsDB { let (info, fork) = accounts_index.get(pubkey, ancestors)?; //TODO: thread this as a ref storage + .0 .get(&info.id) .and_then(|store| Some(store.accounts.get_account(info.offset)?.0.clone_account())) .map(|account| (account, fork)) @@ -282,6 +339,7 @@ impl AccountsDB { let mut candidates: Vec> = { let stores = self.storage.read().unwrap(); stores + .0 .values() .filter_map(|x| { if x.status() == AccountStorageStatus::StorageAvailable && x.fork_id == fork_id @@ -297,7 +355,7 @@ impl AccountsDB { let mut stores = self.storage.write().unwrap(); let path_idx = thread_rng().gen_range(0, self.paths.len()); let storage = Arc::new(self.new_storage_entry(fork_id, &self.paths[path_idx])); - stores.insert(storage.id, storage.clone()); + stores.0.insert(storage.id, storage.clone()); candidates.push(storage); } let rv = thread_rng().gen_range(0, candidates.len()); @@ -309,7 +367,7 @@ impl AccountsDB { let is_root = self.accounts_index.read().unwrap().is_root(fork); trace!("PURGING {} {}", fork, is_root); if !is_root { - self.storage.write().unwrap().retain(|_, v| { + self.storage.write().unwrap().0.retain(|_, v| { trace!("PURGING {} {}", v.fork_id, fork); v.fork_id != fork }); @@ -371,7 +429,7 @@ impl AccountsDB { fn remove_dead_accounts(&self, reclaims: Vec<(Fork, AccountInfo)>) -> HashSet { let storage = self.storage.read().unwrap(); for (fork_id, account_info) in reclaims { - if let Some(store) = storage.get(&account_info.id) { + if let Some(store) = storage.0.get(&account_info.id) { assert_eq!( fork_id, store.fork_id, "AccountDB::accounts_index corrupted. Storage should only point to one fork" @@ -381,6 +439,7 @@ impl AccountsDB { } //TODO: performance here could be improved if AccountsDB::storage was organized by fork let dead_forks: HashSet = storage + .0 .values() .filter_map(|x| { if x.count() == 0 { @@ -391,6 +450,7 @@ impl AccountsDB { }) .collect(); let live_forks: HashSet = storage + .0 .values() .filter_map(|x| if x.count() > 0 { Some(x.fork_id) } else { None }) .collect(); @@ -422,12 +482,143 @@ impl AccountsDB { pub fn add_root(&self, fork: Fork) { self.accounts_index.write().unwrap().add_root(fork) } + + fn merge( + dest: &mut HashMap, + source: &HashMap, + ) { + for (key, (source_version, source_info)) in source.iter() { + if let Some((dest_version, _)) = dest.get(key) { + if dest_version > source_version { + continue; + } + } + dest.insert(*key, (*source_version, source_info.clone())); + } + } + + pub fn generate_index(&mut self) { + let mut forks: Vec = self + .storage + .read() + .unwrap() + .0 + .values() + .map(|x| x.fork_id) + .collect(); + + forks.sort(); + for fork_id in forks.iter() { + let mut accumulator: Vec> = self + .scan_account_storage( + *fork_id, + |stored_account: &StoredAccount, + id: AppendVecId, + accum: &mut HashMap| { + let account_info = AccountInfo { + id, + offset: stored_account.offset, + lamports: stored_account.balance.lamports, + }; + accum.insert( + stored_account.meta.pubkey, + (stored_account.meta.write_version, account_info), + ); + }, + ); + + let mut account_maps = accumulator.pop().unwrap(); + while let Some(maps) = accumulator.pop() { + AccountsDB::merge(&mut account_maps, &maps); + } + let mut accounts_index = self.accounts_index.write().unwrap(); + for (pubkey, (_, account_info)) in account_maps.iter() { + accounts_index.add_index(*fork_id, pubkey, account_info.clone()); + } + } + } +} + +impl Serialize for AccountsDB { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::ser::Serializer, + { + use serde::ser::Error; + let len = serialized_size(&self.accounts_index).unwrap() + + serialized_size(&self.paths).unwrap() + + serialized_size(&self.storage).unwrap() + + std::mem::size_of::() as u64 + + serialized_size(&self.file_size).unwrap(); + let mut buf = vec![0u8; len as usize]; + let mut wr = Cursor::new(&mut buf[..]); + serialize_into(&mut wr, &self.accounts_index).map_err(Error::custom)?; + serialize_into(&mut wr, &self.paths).map_err(Error::custom)?; + serialize_into(&mut wr, &self.storage).map_err(Error::custom)?; + serialize_into( + &mut wr, + &(self.write_version.load(Ordering::Relaxed) as u64), + ) + .map_err(Error::custom)?; + serialize_into(&mut wr, &self.file_size).map_err(Error::custom)?; + let len = wr.position() as usize; + serializer.serialize_bytes(&wr.into_inner()[..len]) + } +} + +struct AccountsDBVisitor; + +impl<'a> serde::de::Visitor<'a> for AccountsDBVisitor { + type Value = AccountsDB; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("Expecting AccountsDB") + } + + fn visit_bytes(self, data: &[u8]) -> std::result::Result + where + E: serde::de::Error, + { + use serde::de::Error; + let mut rd = Cursor::new(&data[..]); + let accounts_index: RwLock> = + deserialize_from(&mut rd).map_err(Error::custom)?; + let paths: Vec = deserialize_from(&mut rd).map_err(Error::custom)?; + let storage: RwLock = deserialize_from(&mut rd).map_err(Error::custom)?; + let write_version: u64 = deserialize_from(&mut rd).map_err(Error::custom)?; + let file_size: u64 = deserialize_from(&mut rd).map_err(Error::custom)?; + + let mut ids: Vec = storage.read().unwrap().0.keys().cloned().collect(); + ids.sort(); + + let mut accounts_db = AccountsDB { + accounts_index, + storage, + next_id: AtomicUsize::new(ids[ids.len() - 1] + 1), + write_version: AtomicUsize::new(write_version as usize), + paths, + file_size, + }; + accounts_db.generate_index(); + + Ok(accounts_db) + } +} + +impl<'de> Deserialize<'de> for AccountsDB { + fn deserialize(deserializer: D) -> std::result::Result + where + D: ::serde::Deserializer<'de>, + { + deserializer.deserialize_bytes(AccountsDBVisitor) + } } #[cfg(test)] mod tests { // TODO: all the bank tests are bank specific, issue: 2194 use super::*; + use bincode::{deserialize_from, serialize_into, serialized_size}; use rand::{thread_rng, Rng}; use solana_sdk::account::Account; @@ -625,16 +816,16 @@ mod tests { db.store(1, &[(&pubkeys[0], &account)]); { let stores = db.storage.read().unwrap(); - assert_eq!(stores.len(), 2); - assert_eq!(stores[&0].count(), 2); - assert_eq!(stores[&1].count(), 2); + assert_eq!(stores.0.len(), 2); + assert_eq!(stores.0[&0].count(), 2); + assert_eq!(stores.0[&1].count(), 2); } db.add_root(1); { let stores = db.storage.read().unwrap(); - assert_eq!(stores.len(), 2); - assert_eq!(stores[&0].count(), 2); - assert_eq!(stores[&1].count(), 2); + assert_eq!(stores.0.len(), 2); + assert_eq!(stores.0[&0].count(), 2); + assert_eq!(stores.0[&1].count(), 2); } } @@ -707,19 +898,40 @@ mod tests { fn check_storage(accounts: &AccountsDB, count: usize) -> bool { let stores = accounts.storage.read().unwrap(); - assert_eq!(stores.len(), 1); - assert_eq!(stores[&0].status(), AccountStorageStatus::StorageAvailable); - stores[&0].count() == count + assert_eq!(stores.0.len(), 1); + assert_eq!( + stores.0[&0].status(), + AccountStorageStatus::StorageAvailable + ); + stores.0[&0].count() == count } - fn check_accounts(accounts: &AccountsDB, pubkeys: &Vec, fork: Fork) { - for _ in 1..100 { - let idx = thread_rng().gen_range(0, 99); + fn check_accounts( + accounts: &AccountsDB, + pubkeys: &Vec, + fork: Fork, + num: usize, + count: usize, + ) { + for _ in 1..num { + let idx = thread_rng().gen_range(0, num - 1); let ancestors = vec![(fork, 0)].into_iter().collect(); let account = accounts.load_slow(&ancestors, &pubkeys[idx]).unwrap(); - let mut default_account = Account::default(); - default_account.lamports = (idx + 1) as u64; - assert_eq!((default_account, 0), account); + let account1 = Account::new((idx + count) as u64, 0, &Account::default().owner); + assert_eq!(account, (account1, fork)); + } + } + + fn modify_accounts( + accounts: &AccountsDB, + pubkeys: &Vec, + fork: Fork, + num: usize, + count: usize, + ) { + for idx in 0..num { + let account = Account::new((idx + count) as u64, 0, &Account::default().owner); + accounts.store(fork, &[(&pubkeys[idx], &account)]); } } @@ -742,7 +954,7 @@ mod tests { let accounts = AccountsDB::new(&paths.paths); let mut pubkeys: Vec = vec![]; create_account(&accounts, &mut pubkeys, 0, 100, 0, 0); - check_accounts(&accounts, &pubkeys, 0); + check_accounts(&accounts, &pubkeys, 0, 100, 1); } #[test] @@ -776,7 +988,7 @@ mod tests { } let mut append_vec_histogram = HashMap::new(); - for storage in accounts.storage.read().unwrap().values() { + for storage in accounts.storage.read().unwrap().0.values() { *append_vec_histogram.entry(storage.fork_id).or_insert(0) += 1; } for count in append_vec_histogram.values() { @@ -798,9 +1010,12 @@ mod tests { accounts.store(0, &[(&pubkey1, &account1)]); { let stores = accounts.storage.read().unwrap(); - assert_eq!(stores.len(), 1); - assert_eq!(stores[&0].count(), 1); - assert_eq!(stores[&0].status(), AccountStorageStatus::StorageAvailable); + assert_eq!(stores.0.len(), 1); + assert_eq!(stores.0[&0].count(), 1); + assert_eq!( + stores.0[&0].status(), + AccountStorageStatus::StorageAvailable + ); } let pubkey2 = Pubkey::new_rand(); @@ -808,11 +1023,14 @@ mod tests { accounts.store(0, &[(&pubkey2, &account2)]); { let stores = accounts.storage.read().unwrap(); - assert_eq!(stores.len(), 2); - assert_eq!(stores[&0].count(), 1); - assert_eq!(stores[&0].status(), AccountStorageStatus::StorageFull); - assert_eq!(stores[&1].count(), 1); - assert_eq!(stores[&1].status(), AccountStorageStatus::StorageAvailable); + assert_eq!(stores.0.len(), 2); + assert_eq!(stores.0[&0].count(), 1); + assert_eq!(stores.0[&0].status(), AccountStorageStatus::StorageFull); + assert_eq!(stores.0[&1].count(), 1); + assert_eq!( + stores.0[&1].status(), + AccountStorageStatus::StorageAvailable + ); } let ancestors = vec![(0, 0)].into_iter().collect(); assert_eq!( @@ -830,13 +1048,13 @@ mod tests { accounts.store(0, &[(&pubkey1, &account1)]); { let stores = accounts.storage.read().unwrap(); - assert_eq!(stores.len(), 3); - assert_eq!(stores[&0].count(), count[index]); - assert_eq!(stores[&0].status(), status[0]); - assert_eq!(stores[&1].count(), 1); - assert_eq!(stores[&1].status(), status[1]); - assert_eq!(stores[&2].count(), count[index ^ 1]); - assert_eq!(stores[&2].status(), status[0]); + assert_eq!(stores.0.len(), 3); + assert_eq!(stores.0[&0].count(), count[index]); + assert_eq!(stores.0[&0].status(), status[0]); + assert_eq!(stores.0[&1].count(), 1); + assert_eq!(stores.0[&1].status(), status[1]); + assert_eq!(stores.0[&2].count(), count[index ^ 1]); + assert_eq!(stores.0[&2].status(), status[0]); } let ancestors = vec![(0, 0)].into_iter().collect(); assert_eq!( @@ -899,17 +1117,42 @@ mod tests { assert!(accounts.accounts_index.read().unwrap().is_purged(0)); //fork is still there, since gc is lazy - assert!(accounts.storage.read().unwrap().get(&info.id).is_some()); + assert!(accounts.storage.read().unwrap().0.get(&info.id).is_some()); //store causes cleanup accounts.store(1, &[(&pubkey, &account)]); //fork is gone - assert!(accounts.storage.read().unwrap().get(&info.id).is_none()); + assert!(accounts.storage.read().unwrap().0.get(&info.id).is_none()); //new value is there let ancestors = vec![(1, 1)].into_iter().collect(); assert_eq!(accounts.load_slow(&ancestors, &pubkey), Some((account, 1))); } + #[test] + fn test_accounts_db_serialize() { + solana_logger::setup(); + let paths = get_tmp_accounts_path!(); + let accounts = AccountsDB::new(&paths.paths); + let mut pubkeys: Vec = vec![]; + create_account(&accounts, &mut pubkeys, 0, 100, 0, 0); + assert_eq!(check_storage(&accounts, 100), true); + check_accounts(&accounts, &pubkeys, 0, 100, 1); + modify_accounts(&accounts, &pubkeys, 0, 100, 2); + check_accounts(&accounts, &pubkeys, 0, 100, 2); + accounts.add_root(0); + + let mut pubkeys1: Vec = vec![]; + create_account(&accounts, &mut pubkeys1, 1, 10, 0, 0); + + let mut buf = vec![0u8; serialized_size(&accounts).unwrap() as usize]; + let mut writer = Cursor::new(&mut buf[..]); + serialize_into(&mut writer, &accounts).unwrap(); + + let mut reader = Cursor::new(&mut buf[..]); + let daccounts: AccountsDB = deserialize_from(&mut reader).unwrap(); + check_accounts(&daccounts, &pubkeys, 0, 100, 2); + check_accounts(&daccounts, &pubkeys1, 1, 10, 1); + } } diff --git a/runtime/src/accounts_index.rs b/runtime/src/accounts_index.rs index 46a2582315..d91f144e38 100644 --- a/runtime/src/accounts_index.rs +++ b/runtime/src/accounts_index.rs @@ -1,13 +1,17 @@ -use hashbrown::{HashMap, HashSet}; use log::*; +use serde::{Deserialize, Serialize}; use solana_sdk::pubkey::Pubkey; +use std::collections::{HashMap, HashSet}; pub type Fork = u64; -#[derive(Default)] +#[derive(Debug, Default, Deserialize, Serialize)] pub struct AccountsIndex { - account_maps: HashMap>, - roots: HashSet, + #[serde(skip)] + pub account_maps: HashMap>, + + pub roots: HashSet, + //This value that needs to be stored to recover the index from AppendVec pub last_root: Fork, } @@ -35,7 +39,7 @@ impl AccountsIndex { let mut rv = vec![]; let mut fork_vec: Vec<(Fork, T)> = vec![]; { - let entry = self.account_maps.entry(*pubkey).or_insert(vec![]); + let entry = self.account_maps.entry(*pubkey).or_insert_with(|| vec![]); std::mem::swap(entry, &mut fork_vec); }; @@ -54,11 +58,17 @@ impl AccountsIndex { ); fork_vec.retain(|(fork, _)| !self.is_purged(*fork)); { - let entry = self.account_maps.entry(*pubkey).or_insert(vec![]); + let entry = self.account_maps.entry(*pubkey).or_insert_with(|| vec![]); std::mem::swap(entry, &mut fork_vec); }; rv } + + pub fn add_index(&mut self, fork: Fork, pubkey: &Pubkey, account_info: T) { + let entry = self.account_maps.entry(*pubkey).or_insert_with(|| vec![]); + entry.push((fork, account_info)); + } + pub fn is_purged(&self, fork: Fork) -> bool { fork < self.last_root } diff --git a/runtime/src/append_vec.rs b/runtime/src/append_vec.rs index fc090c3720..dd760d9043 100644 --- a/runtime/src/append_vec.rs +++ b/runtime/src/append_vec.rs @@ -1,10 +1,13 @@ +use bincode::{deserialize_from, serialize_into, serialized_size}; use memmap::MmapMut; +use serde::{Deserialize, Serialize}; use solana_sdk::account::Account; use solana_sdk::pubkey::Pubkey; +use std::fmt; use std::fs::OpenOptions; -use std::io::{Seek, SeekFrom, Write}; +use std::io::{Cursor, Seek, SeekFrom, Write}; use std::mem; -use std::path::Path; +use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Mutex; @@ -26,7 +29,7 @@ pub struct StorageMeta { pub data_len: u64, } -#[derive(Serialize, Deserialize, Clone, Default, Eq, PartialEq)] +#[derive(Serialize, Deserialize, Clone, Debug, Default, Eq, PartialEq)] pub struct AccountBalance { /// lamports in the account pub lamports: u64, @@ -38,11 +41,13 @@ pub struct AccountBalance { /// References to Memory Mapped memory /// The Account is stored separately from its data, so getting the actual account requires a clone +#[derive(PartialEq, Debug)] pub struct StoredAccount<'a> { pub meta: &'a StorageMeta, /// account data pub balance: &'a AccountBalance, pub data: &'a [u8], + pub offset: usize, } impl<'a> StoredAccount<'a> { @@ -56,7 +61,10 @@ impl<'a> StoredAccount<'a> { } } +#[derive(Debug)] +#[allow(clippy::mutex_atomic)] pub struct AppendVec { + path: PathBuf, map: MmapMut, // This mutex forces append to be single threaded, but concurrent with reads append_offset: Mutex, @@ -82,6 +90,7 @@ impl AppendVec { let map = unsafe { MmapMut::map_mut(&data).expect("failed to map the data file") }; AppendVec { + path: file.to_path_buf(), map, // This mutex forces append to be single threaded, but concurrent with reads // See UNSAFE usage in `append_ptr` @@ -184,6 +193,7 @@ impl AppendVec { meta, balance, data, + offset, }, next, )) @@ -286,6 +296,73 @@ pub mod test_utils { } } +impl Serialize for AppendVec { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::ser::Serializer, + { + use serde::ser::Error; + let len = serialized_size(&self.path).unwrap() + + std::mem::size_of::() as u64 + + serialized_size(&self.file_size).unwrap(); + let mut buf = vec![0u8; len as usize]; + let mut wr = Cursor::new(&mut buf[..]); + serialize_into(&mut wr, &self.path).map_err(Error::custom)?; + serialize_into(&mut wr, &(self.current_len.load(Ordering::Relaxed) as u64)) + .map_err(Error::custom)?; + serialize_into(&mut wr, &self.file_size).map_err(Error::custom)?; + let len = wr.position() as usize; + serializer.serialize_bytes(&wr.into_inner()[..len]) + } +} + +struct AppendVecVisitor; + +impl<'a> serde::de::Visitor<'a> for AppendVecVisitor { + type Value = AppendVec; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("Expecting AppendVec") + } + + #[allow(clippy::mutex_atomic)] + fn visit_bytes(self, data: &[u8]) -> std::result::Result + where + E: serde::de::Error, + { + use serde::de::Error; + let mut rd = Cursor::new(&data[..]); + let path: PathBuf = deserialize_from(&mut rd).map_err(Error::custom)?; + let current_len: u64 = deserialize_from(&mut rd).map_err(Error::custom)?; + let file_size: u64 = deserialize_from(&mut rd).map_err(Error::custom)?; + + let data = OpenOptions::new() + .read(true) + .write(true) + .create(false) + .open(path.as_path()) + .map_err(Error::custom)?; + + let map = unsafe { MmapMut::map_mut(&data).expect("failed to map the data file") }; + Ok(AppendVec { + path, + map, + append_offset: Mutex::new(0), + current_len: AtomicUsize::new(current_len as usize), + file_size, + }) + } +} + +impl<'de> Deserialize<'de> for AppendVec { + fn deserialize(deserializer: D) -> std::result::Result + where + D: ::serde::Deserializer<'de>, + { + deserializer.deserialize_bytes(AppendVecVisitor) + } +} + #[cfg(test)] pub mod tests { use super::test_utils::*; @@ -355,4 +432,30 @@ pub mod tests { duration_as_ms(&now.elapsed()), ); } + + #[test] + fn test_append_vec_serialize() { + let path = Path::new("append_vec_serialize"); + let av: AppendVec = AppendVec::new(path, true, 1024 * 1024); + let account1 = create_test_account(1); + let index1 = av.append_account_test(&account1).unwrap(); + assert_eq!(index1, 0); + assert_eq!(av.get_account_test(index1).unwrap(), account1); + + let account2 = create_test_account(2); + let index2 = av.append_account_test(&account2).unwrap(); + assert_eq!(av.get_account_test(index2).unwrap(), account2); + assert_eq!(av.get_account_test(index1).unwrap(), account1); + + let mut buf = vec![0u8; serialized_size(&av).unwrap() as usize]; + let mut writer = Cursor::new(&mut buf[..]); + serialize_into(&mut writer, &av).unwrap(); + + let mut reader = Cursor::new(&mut buf[..]); + let dav: AppendVec = deserialize_from(&mut reader).unwrap(); + + assert_eq!(dav.get_account_test(index2).unwrap(), account2); + assert_eq!(dav.get_account_test(index1).unwrap(), account1); + std::fs::remove_file(path).unwrap(); + } } diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 144778450f..e9b655b21d 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -4,15 +4,18 @@ //! already been signed and verified. use crate::accounts::{AccountLockType, Accounts}; -use crate::accounts_db::{ErrorCounters, InstructionAccounts, InstructionLoaders}; +use crate::accounts_db::{AccountsDB, ErrorCounters, InstructionAccounts, InstructionLoaders}; use crate::accounts_index::Fork; use crate::blockhash_queue::BlockhashQueue; use crate::locked_accounts_results::LockedAccountsResults; use crate::message_processor::{MessageProcessor, ProcessInstruction}; +use crate::serde_utils::{ + deserialize_atomicbool, deserialize_atomicusize, serialize_atomicbool, serialize_atomicusize, +}; use crate::status_cache::StatusCache; -use bincode::serialize; -use hashbrown::HashMap; +use bincode::{deserialize_from, serialize, serialize_into, serialized_size}; use log::*; +use serde::{Deserialize, Serialize}; use solana_metrics::counter::Counter; use solana_metrics::influxdb; use solana_sdk::account::Account; @@ -28,13 +31,16 @@ use solana_sdk::transaction::{Result, Transaction, TransactionError}; use solana_vote_api::vote_state::MAX_LOCKOUT_HISTORY; use std::borrow::Borrow; use std::cmp; +use std::collections::HashMap; +use std::fmt; +use std::io::Cursor; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; use std::time::Instant; pub const MINIMUM_SLOT_LENGTH: usize = MAX_LOCKOUT_HISTORY + 1; -#[derive(Default, Debug, PartialEq, Eq, Clone, Copy)] +#[derive(Default, Debug, PartialEq, Eq, Clone, Copy, Serialize, Deserialize)] pub struct EpochSchedule { /// The maximum number of slots in each epoch. pub slots_per_epoch: u64, @@ -132,20 +138,84 @@ impl EpochSchedule { type BankStatusCache = StatusCache>; -/// Manager for the state of all accounts and programs after processing its entries. #[derive(Default)] -pub struct Bank { +pub struct BankRc { /// where all the Accounts are stored accounts: Arc, /// A cache of signature statuses status_cache: Arc>, - /// FIFO queue of `recent_blockhash` items - blockhash_queue: RwLock, - /// Previous checkpoint of this bank parent: RwLock>>, +} + +impl Serialize for BankRc { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::ser::Serializer, + { + use serde::ser::Error; + let len = serialized_size(&*self.accounts.accounts_db).unwrap() + + serialized_size(&*self.accounts).unwrap() + + serialized_size(&*self.status_cache).unwrap(); + let mut buf = vec![0u8; len as usize]; + let mut wr = Cursor::new(&mut buf[..]); + serialize_into(&mut wr, &*self.accounts.accounts_db).map_err(Error::custom)?; + serialize_into(&mut wr, &*self.accounts).map_err(Error::custom)?; + serialize_into(&mut wr, &*self.status_cache).map_err(Error::custom)?; + let len = wr.position() as usize; + serializer.serialize_bytes(&wr.into_inner()[..len]) + } +} + +struct BankRcVisitor; + +impl<'a> serde::de::Visitor<'a> for BankRcVisitor { + type Value = BankRc; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("Expecting BankRc") + } + + #[allow(clippy::mutex_atomic)] + fn visit_bytes(self, data: &[u8]) -> std::result::Result + where + E: serde::de::Error, + { + use serde::de::Error; + let mut rd = Cursor::new(&data[..]); + let accounts_db: AccountsDB = deserialize_from(&mut rd).map_err(Error::custom)?; + let mut accounts: Accounts = deserialize_from(&mut rd).map_err(Error::custom)?; + let status_cache: BankStatusCache = deserialize_from(&mut rd).map_err(Error::custom)?; + + accounts.accounts_db = Arc::new(accounts_db); + Ok(BankRc { + accounts: Arc::new(accounts), + status_cache: Arc::new(RwLock::new(status_cache)), + parent: RwLock::new(None), + }) + } +} + +impl<'de> Deserialize<'de> for BankRc { + fn deserialize(deserializer: D) -> std::result::Result + where + D: ::serde::Deserializer<'de>, + { + deserializer.deserialize_bytes(BankRcVisitor) + } +} + +/// Manager for the state of all accounts and programs after processing its entries. +#[derive(Default, Deserialize, Serialize)] +pub struct Bank { + /// References to accounts, parent and signature status + #[serde(skip)] + pub rc: BankRc, + + /// FIFO queue of `recent_blockhash` items + blockhash_queue: RwLock, /// The set of parents including this bank pub ancestors: HashMap, @@ -157,9 +227,13 @@ pub struct Bank { parent_hash: Hash, /// The number of transactions processed without error + #[serde(serialize_with = "serialize_atomicusize")] + #[serde(deserialize_with = "deserialize_atomicusize")] transaction_count: AtomicUsize, // TODO: Use AtomicU64 if/when available /// Bank tick height + #[serde(serialize_with = "serialize_atomicusize")] + #[serde(deserialize_with = "deserialize_atomicusize")] tick_height: AtomicUsize, // TODO: Use AtomicU64 if/when available // Bank max_tick_height @@ -192,6 +266,8 @@ pub struct Bank { /// A boolean reflecting whether any entries were recorded into the PoH /// stream for the slot == self.slot + #[serde(serialize_with = "serialize_atomicbool")] + #[serde(deserialize_with = "deserialize_atomicbool")] is_delta: AtomicBool, /// The Message processor @@ -212,7 +288,7 @@ impl Bank { pub fn new_with_paths(genesis_block: &GenesisBlock, paths: Option) -> Self { let mut bank = Self::default(); bank.ancestors.insert(bank.slot(), 0); - bank.accounts = Arc::new(Accounts::new(paths)); + bank.rc.accounts = Arc::new(Accounts::new(paths)); bank.process_genesis_block(genesis_block); // genesis needs stakes for all epochs up to the epoch implied by // slot = 0 and genesis configuration @@ -230,7 +306,7 @@ impl Bank { let mut bank = Self::default(); bank.blockhash_queue = RwLock::new(parent.blockhash_queue.read().unwrap().clone()); - bank.status_cache = parent.status_cache.clone(); + bank.rc.status_cache = parent.rc.status_cache.clone(); bank.bank_height = parent.bank_height + 1; bank.fee_calculator = parent.fee_calculator.clone(); @@ -255,11 +331,11 @@ impl Bank { .to_owned(), ); - bank.parent = RwLock::new(Some(parent.clone())); + bank.rc.parent = RwLock::new(Some(parent.clone())); bank.parent_hash = parent.hash(); bank.collector_id = *collector_id; - bank.accounts = Arc::new(Accounts::new_from_parent(&parent.accounts)); + bank.rc.accounts = Arc::new(Accounts::new_from_parent(&parent.rc.accounts)); bank.epoch_vote_accounts = { let mut epoch_vote_accounts = parent.epoch_vote_accounts.clone(); @@ -315,19 +391,19 @@ impl Bank { self.freeze(); let parents = self.parents(); - *self.parent.write().unwrap() = None; + *self.rc.parent.write().unwrap() = None; let squash_accounts_start = Instant::now(); for p in parents.iter().rev() { // root forks cannot be purged - self.accounts.add_root(p.slot()); + self.rc.accounts.add_root(p.slot()); } let squash_accounts_ms = duration_as_ms(&squash_accounts_start.elapsed()); let squash_cache_start = Instant::now(); parents .iter() - .for_each(|p| self.status_cache.write().unwrap().add_root(p.slot())); + .for_each(|p| self.rc.status_cache.write().unwrap().add_root(p.slot())); let squash_cache_ms = duration_as_ms(&squash_cache_start.elapsed()); solana_metrics::submit( @@ -346,7 +422,7 @@ impl Bank { /// Return the more recent checkpoint of this bank instance. pub fn parent(&self) -> Option> { - self.parent.read().unwrap().clone() + self.rc.parent.read().unwrap().clone() } fn process_genesis_block(&mut self, genesis_block: &GenesisBlock) { @@ -418,7 +494,7 @@ impl Bank { /// Forget all signatures. Useful for benchmarking. pub fn clear_signatures(&self) { - self.status_cache.write().unwrap().clear_signatures(); + self.rc.status_cache.write().unwrap().clear_signatures(); } pub fn can_commit(result: &Result<()>) -> bool { @@ -430,7 +506,7 @@ impl Bank { } fn update_transaction_statuses(&self, txs: &[Transaction], res: &[Result<()>]) { - let mut status_cache = self.status_cache.write().unwrap(); + let mut status_cache = self.rc.status_cache.write().unwrap(); for (i, tx) in txs.iter().enumerate() { if Self::can_commit(&res[i]) && !tx.signatures.is_empty() { status_cache.insert( @@ -515,7 +591,7 @@ impl Bank { } // TODO: put this assert back in // assert!(!self.is_frozen()); - let results = self.accounts.lock_accounts(txs); + let results = self.rc.accounts.lock_accounts(txs); LockedAccountsResults::new(results, &self, txs, AccountLockType::AccountLock) } @@ -526,11 +602,12 @@ impl Bank { if locked_accounts_results.needs_unlock { locked_accounts_results.needs_unlock = false; match locked_accounts_results.lock_type() { - AccountLockType::AccountLock => self.accounts.unlock_accounts( + AccountLockType::AccountLock => self.rc.accounts.unlock_accounts( locked_accounts_results.transactions(), locked_accounts_results.locked_accounts_results(), ), AccountLockType::RecordLock => self + .rc .accounts .unlock_record_accounts(locked_accounts_results.transactions()), } @@ -544,12 +621,12 @@ impl Bank { where I: std::borrow::Borrow, { - self.accounts.lock_record_accounts(txs); + self.rc.accounts.lock_record_accounts(txs); LockedAccountsResults::new(vec![], &self, txs, AccountLockType::RecordLock) } pub fn unlock_record_accounts(&self, txs: &[Transaction]) { - self.accounts.unlock_record_accounts(txs) + self.rc.accounts.unlock_record_accounts(txs) } fn load_accounts( @@ -558,7 +635,7 @@ impl Bank { results: Vec>, error_counters: &mut ErrorCounters, ) -> Vec> { - self.accounts.load_accounts( + self.rc.accounts.load_accounts( &self.ancestors, txs, results, @@ -612,7 +689,7 @@ impl Bank { lock_results: Vec>, error_counters: &mut ErrorCounters, ) -> Vec> { - let rcache = self.status_cache.read().unwrap(); + let rcache = self.rc.status_cache.read().unwrap(); txs.iter() .zip(lock_results.into_iter()) .map(|(tx, lock_res)| { @@ -806,7 +883,8 @@ impl Bank { // TODO: put this assert back in // assert!(!self.is_frozen()); let now = Instant::now(); - self.accounts + self.rc + .accounts .store_accounts(self.slot(), txs, executed, loaded_accounts); self.store_vote_accounts(txs, executed, loaded_accounts); @@ -874,7 +952,7 @@ impl Bank { } fn store(&self, pubkey: &Pubkey, account: &Account) { - self.accounts.store_slow(self.slot(), pubkey, account); + self.rc.accounts.store_slow(self.slot(), pubkey, account); if solana_vote_api::check_id(&account.owner) { let mut vote_accounts = self.vote_accounts.write().unwrap(); if account.lamports != 0 { @@ -907,8 +985,22 @@ impl Bank { self.store(pubkey, &account); } + pub fn accounts(&self) -> Arc { + self.rc.accounts.clone() + } + + pub fn set_bank_rc(&mut self, bank_rc: &BankRc) { + self.rc.accounts = bank_rc.accounts.clone(); + self.rc.status_cache = bank_rc.status_cache.clone() + } + + pub fn set_parent(&mut self, parent: &Arc) { + self.rc.parent = RwLock::new(Some(parent.clone())); + } + pub fn get_account(&self, pubkey: &Pubkey) -> Option { - self.accounts + self.rc + .accounts .load_slow(&self.ancestors, pubkey) .map(|(account, _)| account) } @@ -917,12 +1009,12 @@ impl Bank { &self, program_id: &Pubkey, ) -> Vec<(Pubkey, Account)> { - self.accounts.load_by_program(self.slot(), program_id) + self.rc.accounts.load_by_program(self.slot(), program_id) } pub fn get_account_modified_since_parent(&self, pubkey: &Pubkey) -> Option<(Account, Fork)> { let just_self: HashMap = vec![(self.slot(), 0)].into_iter().collect(); - self.accounts.load_slow(&just_self, pubkey) + self.rc.accounts.load_slow(&just_self, pubkey) } pub fn transaction_count(&self) -> u64 { @@ -937,7 +1029,7 @@ impl Bank { &self, signature: &Signature, ) -> Option<(usize, Result<()>)> { - let rcache = self.status_cache.read().unwrap(); + let rcache = self.rc.status_cache.read().unwrap(); rcache.get_signature_status_slow(signature, &self.ancestors) } @@ -955,11 +1047,11 @@ impl Bank { fn hash_internal_state(&self) -> Hash { // If there are no accounts, return the same hash as we did before // checkpointing. - if !self.accounts.has_accounts(self.slot()) { + if !self.rc.accounts.has_accounts(self.slot()) { return self.parent_hash; } - let accounts_delta_hash = self.accounts.hash_internal_state(self.slot()); + let accounts_delta_hash = self.rc.accounts.hash_internal_state(self.slot()); extend_and_hash(&self.parent_hash, &serialize(&accounts_delta_hash).unwrap()) } @@ -1059,12 +1151,45 @@ impl Bank { // Register a bogus executable account, which will be loaded and ignored. self.register_native_instruction_processor("", &program_id); } + + pub fn compare_bank(&self, dbank: &Bank) { + assert_eq!(self.slot, dbank.slot); + assert_eq!(self.collector_id, dbank.collector_id); + assert_eq!(self.epoch_schedule, dbank.epoch_schedule); + assert_eq!(self.epoch_vote_accounts, dbank.epoch_vote_accounts); + assert_eq!(self.ticks_per_slot, dbank.ticks_per_slot); + assert_eq!(self.parent_hash, dbank.parent_hash); + assert_eq!( + self.tick_height.load(Ordering::SeqCst), + dbank.tick_height.load(Ordering::SeqCst) + ); + assert_eq!( + self.is_delta.load(Ordering::SeqCst), + dbank.is_delta.load(Ordering::SeqCst) + ); + + let bh = self.hash.read().unwrap(); + let dbh = dbank.hash.read().unwrap(); + assert_eq!(*bh, *dbh); + + let bhq = self.blockhash_queue.read().unwrap(); + let dbhq = dbank.blockhash_queue.read().unwrap(); + assert_eq!(*bhq, *dbhq); + + let sc = self.rc.status_cache.read().unwrap(); + let dsc = dbank.rc.status_cache.read().unwrap(); + assert_eq!(*sc, *dsc); + assert_eq!( + self.rc.accounts.hash_internal_state(self.slot), + dbank.rc.accounts.hash_internal_state(dbank.slot) + ); + } } impl Drop for Bank { fn drop(&mut self) { // For root forks this is a noop - self.accounts.purge_fork(self.slot()); + self.rc.accounts.purge_fork(self.slot()); } } @@ -1072,6 +1197,7 @@ impl Drop for Bank { mod tests { use super::*; use crate::genesis_utils::{create_genesis_block_with_leader, BOOTSTRAP_LEADER_LAMPORTS}; + use bincode::{deserialize_from, serialize_into, serialized_size}; use solana_sdk::genesis_block::create_genesis_block; use solana_sdk::hash; use solana_sdk::instruction::InstructionError; @@ -1080,6 +1206,7 @@ mod tests { use solana_sdk::system_transaction; use solana_vote_api::vote_instruction; use solana_vote_api::vote_state::VoteState; + use std::io::Cursor; #[test] fn test_bank_new() { @@ -1984,4 +2111,29 @@ mod tests { assert!(bank.is_delta.load(Ordering::Relaxed)); } + + #[test] + fn test_bank_serialize() { + let (genesis_block, _) = create_genesis_block(500); + let bank0 = Arc::new(Bank::new(&genesis_block)); + let bank = new_from_parent(&bank0); + + // Test new account + let key = Keypair::new(); + bank.deposit(&key.pubkey(), 10); + assert_eq!(bank.get_balance(&key.pubkey()), 10); + + let len = serialized_size(&bank).unwrap() + serialized_size(&bank.rc).unwrap(); + let mut buf = vec![0u8; len as usize]; + let mut writer = Cursor::new(&mut buf[..]); + serialize_into(&mut writer, &bank).unwrap(); + serialize_into(&mut writer, &bank.rc).unwrap(); + + let mut reader = Cursor::new(&mut buf[..]); + let mut dbank: Bank = deserialize_from(&mut reader).unwrap(); + let dbank_rc: BankRc = deserialize_from(&mut reader).unwrap(); + dbank.rc = dbank_rc; + assert_eq!(dbank.get_balance(&key.pubkey()), 10); + bank.compare_bank(&dbank); + } } diff --git a/runtime/src/blockhash_queue.rs b/runtime/src/blockhash_queue.rs index 2143be4341..8ad557f880 100644 --- a/runtime/src/blockhash_queue.rs +++ b/runtime/src/blockhash_queue.rs @@ -1,15 +1,16 @@ -use hashbrown::HashMap; +use serde::{Deserialize, Serialize}; use solana_sdk::hash::Hash; use solana_sdk::timing::timestamp; +use std::collections::HashMap; -#[derive(Debug, PartialEq, Eq, Clone)] +#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] struct HashAge { timestamp: u64, hash_height: u64, } /// Low memory overhead, so can be cloned for every checkpoint -#[derive(Clone)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct BlockhashQueue { /// updated whenever an hash is registered hash_height: u64, diff --git a/runtime/src/bloom.rs b/runtime/src/bloom.rs index c1674cffd1..ede157eb6e 100644 --- a/runtime/src/bloom.rs +++ b/runtime/src/bloom.rs @@ -16,7 +16,8 @@ pub trait BloomHashIndex { #[derive(Serialize, Deserialize, Default, Clone, Debug, PartialEq)] pub struct Bloom { pub keys: Vec, - pub bits: BitVec, + pub bits: BitVec, + num_bits_set: u64, _phantom: PhantomData, } @@ -26,6 +27,7 @@ impl Bloom { Bloom { keys, bits, + num_bits_set: 0, _phantom: PhantomData::default(), } } @@ -47,11 +49,15 @@ impl Bloom { } pub fn clear(&mut self) { self.bits = BitVec::new_fill(false, self.bits.len()); + self.num_bits_set = 0; } pub fn add(&mut self, key: &T) { for k in &self.keys { let pos = self.pos(key, *k); - self.bits.set(pos, true); + if !self.bits.get(pos) { + self.num_bits_set += 1; + self.bits.set(pos, true); + } } } pub fn contains(&self, key: &T) -> bool { diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index d823d3e644..d7a69b391c 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -11,11 +11,9 @@ pub mod loader_utils; pub mod locked_accounts_results; pub mod message_processor; mod native_loader; +mod serde_utils; mod status_cache; mod system_instruction_processor; #[macro_use] extern crate solana_metrics; - -#[macro_use] -extern crate serde_derive; diff --git a/runtime/src/message_processor.rs b/runtime/src/message_processor.rs index 2c84292d34..0a2ecbb4e7 100644 --- a/runtime/src/message_processor.rs +++ b/runtime/src/message_processor.rs @@ -1,5 +1,6 @@ use crate::native_loader; use crate::system_instruction_processor; +use serde::{Deserialize, Serialize}; use solana_sdk::account::{create_keyed_accounts, Account, KeyedAccount}; use solana_sdk::instruction::{CompiledInstruction, InstructionError}; use solana_sdk::instruction_processor_utils; @@ -85,8 +86,11 @@ pub type ProcessInstruction = pub type SymbolCache = RwLock, Symbol>>; +#[derive(Serialize, Deserialize)] pub struct MessageProcessor { + #[serde(skip)] instruction_processors: Vec<(Pubkey, ProcessInstruction)>, + #[serde(skip)] symbol_cache: SymbolCache, } diff --git a/runtime/src/serde_utils.rs b/runtime/src/serde_utils.rs new file mode 100644 index 0000000000..f4376d83b1 --- /dev/null +++ b/runtime/src/serde_utils.rs @@ -0,0 +1,62 @@ +use std::fmt; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; + +struct U64Visitor; +impl<'a> serde::de::Visitor<'a> for U64Visitor { + type Value = u64; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("Expecting u64") + } + fn visit_u64(self, data: u64) -> std::result::Result + where + E: serde::de::Error, + { + Ok(data) + } +} + +pub fn deserialize_atomicusize<'de, D>(d: D) -> Result +where + D: serde::de::Deserializer<'de>, +{ + let value = d.deserialize_u64(U64Visitor)?; + Ok(AtomicUsize::new(value as usize)) +} + +pub fn serialize_atomicusize(x: &AtomicUsize, s: S) -> Result +where + S: serde::Serializer, +{ + s.serialize_u64(x.load(Ordering::SeqCst) as u64) +} + +struct BoolVisitor; +impl<'a> serde::de::Visitor<'a> for BoolVisitor { + type Value = bool; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("Expecting bool") + } + fn visit_bool(self, data: bool) -> std::result::Result + where + E: serde::de::Error, + { + Ok(data) + } +} + +pub fn deserialize_atomicbool<'de, D>(d: D) -> Result +where + D: serde::de::Deserializer<'de>, +{ + let value = d.deserialize_bool(BoolVisitor)?; + Ok(AtomicBool::new(value)) +} + +pub fn serialize_atomicbool(x: &AtomicBool, s: S) -> Result +where + S: serde::Serializer, +{ + s.serialize_bool(x.load(Ordering::SeqCst)) +} diff --git a/runtime/src/status_cache.rs b/runtime/src/status_cache.rs index a1d377f778..5c7ffa40b7 100644 --- a/runtime/src/status_cache.rs +++ b/runtime/src/status_cache.rs @@ -1,7 +1,8 @@ -use hashbrown::{HashMap, HashSet}; use log::*; +use serde::{Deserialize, Serialize}; use solana_sdk::hash::Hash; use solana_sdk::signature::Signature; +use std::collections::{HashMap, HashSet}; const MAX_CACHE_ENTRIES: usize = solana_sdk::timing::MAX_HASH_AGE_IN_SECONDS; @@ -11,6 +12,7 @@ pub type ForkStatus = Vec<(ForkId, T)>; type SignatureMap = HashMap>; type StatusMap = HashMap)>; +#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] pub struct StatusCache { /// all signatures seen during a hash period cache: StatusMap, @@ -82,7 +84,7 @@ impl StatusCache { .entry(*transaction_blockhash) .or_insert((fork, HashMap::new())); sig_map.0 = std::cmp::max(fork, sig_map.0); - let sig_forks = sig_map.1.entry(*sig).or_insert(vec![]); + let sig_forks = sig_map.1.entry(*sig).or_insert_with(|| vec![]); sig_forks.push((fork, res)); } @@ -97,7 +99,9 @@ impl StatusCache { #[cfg(test)] mod tests { use super::*; + use bincode::{deserialize_from, serialize_into, serialized_size}; use solana_sdk::hash::hash; + use std::io::Cursor; type BankStatusCache = StatusCache<()>; @@ -242,4 +246,29 @@ mod tests { .get_signature_status(&sig, &blockhash, &ancestors) .is_some()); } + + fn test_serialize(sc: &BankStatusCache) { + let mut buf = vec![0u8; serialized_size(sc).unwrap() as usize]; + let mut writer = Cursor::new(&mut buf[..]); + serialize_into(&mut writer, sc).unwrap(); + + let mut reader = Cursor::new(&mut buf[..]); + let deser: BankStatusCache = deserialize_from(&mut reader).unwrap(); + assert_eq!(*sc, deser); + } + + #[test] + fn test_statuscache_serialize() { + let sig = Signature::default(); + let mut status_cache = BankStatusCache::default(); + let blockhash = hash(Hash::default().as_ref()); + status_cache.add_root(0); + status_cache.clear_signatures(); + status_cache.insert(&blockhash, &sig, 0, ()); + test_serialize(&status_cache); + + let new_blockhash = hash(Hash::default().as_ref()); + status_cache.insert(&new_blockhash, &sig, 1, ()); + test_serialize(&status_cache); + } }