Create bank snapshots (#4244)

* Revert "Revert "Create bank snapshots (#3671)" (#4243)"

This reverts commit 81fa69d347.

* keep saved and unsaved copies of status cache

* fix format check

* bench for status cache serialize

* misc cleanup

* remove appendvec storage on purge

* fix accounts restore

* cleanup

* Pass snapshot path as args

* Fix clippy
This commit is contained in:
Sathish 2019-05-30 21:31:35 -07:00 committed by GitHub
parent 2d284ba6db
commit 182096dc1a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 1467 additions and 175 deletions

View File

@ -1,10 +1,15 @@
//! The `bank_forks` module implments BankForks a DAG of checkpointed Banks
use hashbrown::{HashMap, HashSet};
use bincode::{deserialize_from, serialize_into};
use solana_metrics::inc_new_counter_info;
use solana_runtime::bank::Bank;
use solana_runtime::bank::{Bank, BankRc, StatusCacheRc};
use solana_sdk::timing;
use std::collections::{HashMap, HashSet};
use std::fs;
use std::fs::File;
use std::io::{BufReader, BufWriter, Error, ErrorKind};
use std::ops::Index;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Instant;
@ -12,6 +17,8 @@ pub struct BankForks {
banks: HashMap<u64, Arc<Bank>>,
working_bank: Arc<Bank>,
root: u64,
slots: HashSet<u64>,
snapshot_path: Option<String>,
}
impl Index<u64> for BankForks {
@ -30,6 +37,8 @@ impl BankForks {
banks,
working_bank,
root: 0,
slots: HashSet::new(),
snapshot_path: None,
}
}
@ -45,6 +54,7 @@ impl BankForks {
}
/// Create a map of bank slot id to the set of all of its descendants
#[allow(clippy::or_fun_call)]
pub fn descendants(&self) -> HashMap<u64, HashSet<u64>> {
let mut descendants = HashMap::new();
for bank in self.banks.values() {
@ -91,6 +101,8 @@ impl BankForks {
root,
banks,
working_bank,
slots: HashSet::new(),
snapshot_path: None,
}
}
@ -138,9 +150,199 @@ impl BankForks {
}
fn prune_non_root(&mut self, root: u64) {
let slots: HashSet<u64> = self
.banks
.iter()
.filter(|(_, b)| b.is_frozen())
.map(|(k, _)| *k)
.collect();
let descendants = self.descendants();
self.banks
.retain(|slot, _| descendants[&root].contains(slot))
.retain(|slot, _| descendants[&root].contains(slot));
if self.snapshot_path.is_some() {
let diff: HashSet<_> = slots.symmetric_difference(&self.slots).collect();
trace!("prune non root {} - {:?}", root, diff);
for slot in diff.iter() {
if **slot > root {
let _ = self.add_snapshot(**slot, root);
} else if **slot > 0 {
BankForks::remove_snapshot(**slot, &self.snapshot_path);
}
}
}
self.slots = slots.clone();
}
fn get_io_error(error: &str) -> Error {
warn!("BankForks error: {:?}", error);
Error::new(ErrorKind::Other, error)
}
fn get_snapshot_path(path: &Option<String>) -> PathBuf {
Path::new(&path.clone().unwrap()).to_path_buf()
}
pub fn add_snapshot(&self, slot: u64, root: u64) -> Result<(), Error> {
let path = BankForks::get_snapshot_path(&self.snapshot_path);
fs::create_dir_all(path.clone())?;
let bank_file = format!("{}", slot);
let bank_file_path = path.join(bank_file);
trace!("path: {:?}", bank_file_path);
let file = File::create(bank_file_path)?;
let mut stream = BufWriter::new(file);
let bank_slot = self.get(slot);
if bank_slot.is_none() {
return Err(BankForks::get_io_error("bank_forks get error"));
}
let bank = bank_slot.unwrap().clone();
serialize_into(&mut stream, &*bank)
.map_err(|_| BankForks::get_io_error("serialize bank error"))?;
let mut parent_slot: u64 = 0;
if let Some(parent_bank) = bank.parent() {
parent_slot = parent_bank.slot();
}
serialize_into(&mut stream, &parent_slot)
.map_err(|_| BankForks::get_io_error("serialize bank parent error"))?;
serialize_into(&mut stream, &root)
.map_err(|_| BankForks::get_io_error("serialize root error"))?;
serialize_into(&mut stream, &bank.src)
.map_err(|_| BankForks::get_io_error("serialize bank status cache error"))?;
serialize_into(&mut stream, &bank.rc)
.map_err(|_| BankForks::get_io_error("serialize bank accounts error"))?;
Ok(())
}
pub fn remove_snapshot(slot: u64, path: &Option<String>) {
let path = BankForks::get_snapshot_path(path);
let bank_file = format!("{}", slot);
let bank_file_path = path.join(bank_file);
let _ = fs::remove_file(bank_file_path);
}
pub fn set_snapshot_config(&mut self, path: Option<String>) {
self.snapshot_path = path;
}
fn load_snapshots(
names: &[u64],
bank_maps: &mut Vec<(u64, u64, Bank)>,
status_cache_rc: &StatusCacheRc,
snapshot_path: &Option<String>,
) -> Option<(BankRc, u64)> {
let path = BankForks::get_snapshot_path(snapshot_path);
let mut bank_rc: Option<(BankRc, u64)> = None;
for bank_slot in names.iter().rev() {
let bank_path = format!("{}", bank_slot);
let bank_file_path = path.join(bank_path.clone());
info!("Load from {:?}", bank_file_path);
let file = File::open(bank_file_path);
if file.is_err() {
warn!("Snapshot file open failed for {}", bank_slot);
continue;
}
let file = file.unwrap();
let mut stream = BufReader::new(file);
let bank: Result<Bank, std::io::Error> = deserialize_from(&mut stream)
.map_err(|_| BankForks::get_io_error("deserialize bank error"));
let slot: Result<u64, std::io::Error> = deserialize_from(&mut stream)
.map_err(|_| BankForks::get_io_error("deserialize bank parent error"));
let parent_slot = if slot.is_ok() { slot.unwrap() } else { 0 };
let root: Result<u64, std::io::Error> = deserialize_from(&mut stream)
.map_err(|_| BankForks::get_io_error("deserialize root error"));
let status_cache: Result<StatusCacheRc, std::io::Error> = deserialize_from(&mut stream)
.map_err(|_| BankForks::get_io_error("deserialize bank status cache error"));
if bank_rc.is_none() {
let rc: Result<BankRc, std::io::Error> = deserialize_from(&mut stream)
.map_err(|_| BankForks::get_io_error("deserialize bank accounts error"));
if rc.is_ok() {
bank_rc = Some((rc.unwrap(), root.unwrap()));
}
}
if bank_rc.is_some() {
match bank {
Ok(v) => {
if status_cache.is_ok() {
status_cache_rc.append(&status_cache.unwrap());
}
bank_maps.push((*bank_slot, parent_slot, v));
}
Err(_) => warn!("Load snapshot failed for {}", bank_slot),
}
} else {
BankForks::remove_snapshot(*bank_slot, snapshot_path);
warn!("Load snapshot rc failed for {}", bank_slot);
}
}
bank_rc
}
fn setup_banks(
bank_maps: &mut Vec<(u64, u64, Bank)>,
bank_rc: &BankRc,
status_cache_rc: &StatusCacheRc,
) -> (HashMap<u64, Arc<Bank>>, HashSet<u64>, u64) {
let mut banks = HashMap::new();
let mut slots = HashSet::new();
let (last_slot, last_parent_slot, mut last_bank) = bank_maps.remove(0);
last_bank.set_bank_rc(&bank_rc, &status_cache_rc);
while let Some((slot, parent_slot, mut bank)) = bank_maps.pop() {
bank.set_bank_rc(&bank_rc, &status_cache_rc);
if parent_slot != 0 {
if let Some(parent) = banks.get(&parent_slot) {
bank.set_parent(parent);
}
}
if slot > 0 {
banks.insert(slot, Arc::new(bank));
slots.insert(slot);
}
}
if last_parent_slot != 0 {
if let Some(parent) = banks.get(&last_parent_slot) {
last_bank.set_parent(parent);
}
}
banks.insert(last_slot, Arc::new(last_bank));
slots.insert(last_slot);
(banks, slots, last_slot)
}
pub fn load_from_snapshot(snapshot_path: &Option<String>) -> Result<Self, Error> {
let path = BankForks::get_snapshot_path(snapshot_path);
let paths = fs::read_dir(path)?;
let mut names = paths
.filter_map(|entry| {
entry.ok().and_then(|e| {
e.path()
.file_name()
.and_then(|n| n.to_str().map(|s| s.parse::<u64>().unwrap()))
})
})
.collect::<Vec<u64>>();
names.sort();
let mut bank_maps = vec![];
let status_cache_rc = StatusCacheRc::default();
let rc = BankForks::load_snapshots(&names, &mut bank_maps, &status_cache_rc, snapshot_path);
if bank_maps.is_empty() || rc.is_none() {
BankForks::remove_snapshot(0, snapshot_path);
return Err(Error::new(ErrorKind::Other, "no snapshots loaded"));
}
let (bank_rc, root) = rc.unwrap();
let (banks, slots, last_slot) =
BankForks::setup_banks(&mut bank_maps, &bank_rc, &status_cache_rc);
let working_bank = banks[&last_slot].clone();
Ok(BankForks {
banks,
working_bank,
root,
slots,
snapshot_path: snapshot_path.clone(),
})
}
}
@ -150,6 +352,10 @@ mod tests {
use crate::genesis_utils::{create_genesis_block, GenesisBlockInfo};
use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::system_transaction;
use std::env;
use std::fs::remove_dir_all;
#[test]
fn test_bank_forks() {
@ -174,8 +380,8 @@ mod tests {
let bank = Bank::new_from_parent(&bank0, &Pubkey::default(), 2);
bank_forks.insert(bank);
let descendants = bank_forks.descendants();
let children: Vec<u64> = descendants[&0].iter().cloned().collect();
assert_eq!(children, vec![1, 2]);
let children: HashSet<u64> = [1u64, 2u64].to_vec().into_iter().collect();
assert_eq!(children, *descendants.get(&0).unwrap());
assert!(descendants[&1].is_empty());
assert!(descendants[&2].is_empty());
}
@ -219,4 +425,103 @@ mod tests {
assert_eq!(bank_forks.active_banks(), vec![1]);
}
struct TempPaths {
pub paths: String,
}
#[macro_export]
macro_rules! tmp_bank_accounts_name {
() => {
&format!("{}-{}", file!(), line!())
};
}
#[macro_export]
macro_rules! get_tmp_bank_accounts_path {
() => {
get_tmp_bank_accounts_path(tmp_bank_accounts_name!())
};
}
impl Drop for TempPaths {
fn drop(&mut self) {
let paths: Vec<String> = self.paths.split(',').map(|s| s.to_string()).collect();
paths.iter().for_each(|p| {
let _ignored = remove_dir_all(p);
});
}
}
fn get_paths_vec(paths: &str) -> Vec<String> {
paths.split(',').map(|s| s.to_string()).collect()
}
fn get_tmp_snapshots_path() -> TempPaths {
let out_dir = env::var("OUT_DIR").unwrap_or_else(|_| "target".to_string());
let path = format!("{}/snapshots", out_dir);
TempPaths {
paths: path.to_string(),
}
}
fn get_tmp_bank_accounts_path(paths: &str) -> TempPaths {
let vpaths = get_paths_vec(paths);
let out_dir = env::var("OUT_DIR").unwrap_or_else(|_| "target".to_string());
let vpaths: Vec<_> = vpaths
.iter()
.map(|path| format!("{}/{}", out_dir, path))
.collect();
TempPaths {
paths: vpaths.join(","),
}
}
fn restore_from_snapshot(bank_forks: BankForks, last_slot: u64) {
let new = BankForks::load_from_snapshot(&bank_forks.snapshot_path).unwrap();
for (slot, _) in new.banks.iter() {
let bank = bank_forks.banks.get(slot).unwrap().clone();
let new_bank = new.banks.get(slot).unwrap();
bank.compare_bank(&new_bank);
}
assert_eq!(new.working_bank().slot(), last_slot);
for (slot, _) in new.banks.iter() {
BankForks::remove_snapshot(*slot, &bank_forks.snapshot_path);
}
}
#[test]
fn test_bank_forks_snapshot_n() {
solana_logger::setup();
let path = get_tmp_bank_accounts_path!();
let spath = get_tmp_snapshots_path();
let GenesisBlockInfo {
genesis_block,
mint_keypair,
..
} = create_genesis_block(10_000);
for index in 0..10 {
let bank0 = Bank::new_with_paths(&genesis_block, Some(path.paths.clone()));
bank0.freeze();
let slot = bank0.slot();
let mut bank_forks = BankForks::new(0, bank0);
bank_forks.set_snapshot_config(Some(spath.paths.clone()));
bank_forks.add_snapshot(slot, 0).unwrap();
for forks in 0..index {
let bank = Bank::new_from_parent(&bank_forks[forks], &Pubkey::default(), forks + 1);
let key1 = Keypair::new().pubkey();
let tx = system_transaction::create_user_account(
&mint_keypair,
&key1,
1,
genesis_block.hash(),
);
assert_eq!(bank.process_transaction(&tx), Ok(()));
bank.freeze();
let slot = bank.slot();
bank_forks.insert(bank);
bank_forks.add_snapshot(slot, 0).unwrap();
}
restore_from_snapshot(bank_forks, index);
}
}
}

View File

@ -11,7 +11,7 @@ use solana_kvstore as kvstore;
use bincode::deserialize;
use hashbrown::HashMap;
use std::collections::HashMap;
#[cfg(not(feature = "kvstore"))]
use rocksdb;

View File

@ -26,7 +26,6 @@ use crate::staking_utils;
use crate::streamer::{BlobReceiver, BlobSender};
use bincode::{deserialize, serialize};
use core::cmp;
use hashbrown::HashMap;
use rand::{thread_rng, Rng};
use rayon::prelude::*;
use solana_metrics::{datapoint_debug, inc_new_counter_debug, inc_new_counter_error};
@ -40,7 +39,7 @@ use solana_sdk::signature::{Keypair, KeypairUtil, Signable, Signature};
use solana_sdk::timing::{duration_as_ms, timestamp};
use solana_sdk::transaction::Transaction;
use std::cmp::min;
use std::collections::BTreeSet;
use std::collections::{BTreeSet, HashMap};
use std::fmt;
use std::io;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket};

View File

@ -8,10 +8,10 @@ use crate::crds_gossip_error::CrdsGossipError;
use crate::crds_gossip_pull::CrdsGossipPull;
use crate::crds_gossip_push::{CrdsGossipPush, CRDS_GOSSIP_NUM_ACTIVE};
use crate::crds_value::CrdsValue;
use hashbrown::HashMap;
use solana_runtime::bloom::Bloom;
use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey;
use std::collections::HashMap;
///The min size for bloom filters
pub const CRDS_GOSSIP_BLOOM_SIZE: usize = 1000;

View File

@ -16,13 +16,13 @@ use crate::crds_gossip_error::CrdsGossipError;
use crate::crds_value::{CrdsValue, CrdsValueLabel};
use crate::packet::BLOB_DATA_SIZE;
use bincode::serialized_size;
use hashbrown::HashMap;
use rand;
use rand::distributions::{Distribution, WeightedIndex};
use solana_runtime::bloom::Bloom;
use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey;
use std::cmp;
use std::collections::HashMap;
use std::collections::VecDeque;
pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000;

View File

@ -15,7 +15,6 @@ use crate::crds_gossip_error::CrdsGossipError;
use crate::crds_value::{CrdsValue, CrdsValueLabel};
use crate::packet::BLOB_DATA_SIZE;
use bincode::serialized_size;
use hashbrown::HashMap;
use indexmap::map::IndexMap;
use rand;
use rand::distributions::{Distribution, WeightedIndex};
@ -25,6 +24,7 @@ use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing::timestamp;
use std::cmp;
use std::collections::HashMap;
pub const CRDS_GOSSIP_NUM_ACTIVE: usize = 30;
pub const CRDS_GOSSIP_PUSH_FANOUT: usize = 6;

View File

@ -1,13 +1,12 @@
use crate::bank_forks::BankForks;
use crate::staking_utils;
use hashbrown::{HashMap, HashSet};
use solana_metrics::datapoint_info;
use solana_runtime::bank::Bank;
use solana_sdk::account::Account;
use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey;
use solana_vote_api::vote_state::{Lockout, Vote, VoteState, MAX_LOCKOUT_HISTORY};
use std::collections::VecDeque;
use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::Arc;
pub const VOTE_THRESHOLD_DEPTH: usize = 8;

View File

@ -13,7 +13,6 @@ use crate::poh_recorder::PohRecorder;
use crate::result::{Error, Result};
use crate::rpc_subscriptions::RpcSubscriptions;
use crate::service::Service;
use hashbrown::HashMap;
use solana_metrics::{datapoint_warn, inc_new_counter_error, inc_new_counter_info};
use solana_runtime::bank::Bank;
use solana_sdk::hash::Hash;
@ -22,6 +21,7 @@ use solana_sdk::signature::KeypairUtil;
use solana_sdk::timing::{self, duration_as_ms};
use solana_sdk::transaction::Transaction;
use solana_vote_api::vote_instruction;
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender};
use std::sync::{Arc, Mutex, RwLock};
@ -510,7 +510,7 @@ impl ReplayStage {
let bank_slot = bank.slot();
let bank_progress = &mut progress
.entry(bank_slot)
.or_insert(ForkProgress::new(bank.last_blockhash()));
.or_insert_with(|| ForkProgress::new(bank.last_blockhash()));
blocktree.get_slot_entries_with_blob_count(bank_slot, bank_progress.num_blobs as u64, None)
}
@ -522,7 +522,7 @@ impl ReplayStage {
) -> Result<()> {
let bank_progress = &mut progress
.entry(bank.slot())
.or_insert(ForkProgress::new(bank.last_blockhash()));
.or_insert_with(|| ForkProgress::new(bank.last_blockhash()));
let result = Self::verify_and_process_entries(&bank, &entries, &bank_progress.last_entry);
bank_progress.num_blobs += num;
if let Some(last_entry) = entries.last() {

View File

@ -1,9 +1,9 @@
use hashbrown::HashMap;
use solana_runtime::bank::Bank;
use solana_sdk::account::Account;
use solana_sdk::pubkey::Pubkey;
use solana_vote_api::vote_state::VoteState;
use std::borrow::Borrow;
use std::collections::HashMap;
/// Looks through vote accounts, and finds the latest slot that has achieved
/// supermajority lockout
@ -47,7 +47,7 @@ pub fn vote_account_stakes_at_epoch(
/// that have non-zero balance in any of their managed staking accounts
pub fn staked_nodes_at_epoch(bank: &Bank, epoch_height: u64) -> Option<HashMap<Pubkey, u64>> {
bank.epoch_vote_accounts(epoch_height)
.map(|vote_accounts| to_staked_nodes(to_vote_states(vote_accounts.into_iter())))
.map(|vote_accounts| to_staked_nodes(to_vote_states(vote_accounts.iter())))
}
// input (vote_pubkey, (stake, vote_account)) => (stake, vote_state)
@ -78,7 +78,7 @@ fn epoch_stakes_and_lockouts(bank: &Bank, epoch_height: u64) -> Vec<(u64, Option
let node_staked_accounts = bank
.epoch_vote_accounts(epoch_height)
.expect("Bank state for epoch is missing")
.into_iter();
.iter();
to_vote_states(node_staked_accounts)
.map(|(stake, states)| (stake, states.root_slot))
@ -116,13 +116,13 @@ pub(crate) mod tests {
create_genesis_block, create_genesis_block_with_leader, GenesisBlockInfo,
BOOTSTRAP_LEADER_LAMPORTS,
};
use hashbrown::HashSet;
use solana_sdk::instruction::Instruction;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::transaction::Transaction;
use solana_stake_api::stake_instruction;
use solana_vote_api::vote_instruction;
use std::collections::HashSet;
use std::iter::FromIterator;
use std::sync::Arc;

View File

@ -38,6 +38,7 @@ pub struct ValidatorConfig {
pub storage_rotate_count: u64,
pub account_paths: Option<String>,
pub rpc_config: JsonRpcConfig,
pub snapshot_path: Option<String>,
}
impl Default for ValidatorConfig {
fn default() -> Self {
@ -52,6 +53,7 @@ impl Default for ValidatorConfig {
storage_rotate_count: NUM_HASHES_FOR_STORAGE_ROTATE,
account_paths: None,
rpc_config: JsonRpcConfig::default(),
snapshot_path: None,
}
}
}
@ -93,7 +95,11 @@ impl Validator {
completed_slots_receiver,
leader_schedule_cache,
poh_config,
) = new_banks_from_blocktree(ledger_path, config.account_paths.clone());
) = new_banks_from_blocktree(
ledger_path,
config.account_paths.clone(),
config.snapshot_path.clone(),
);
let leader_schedule_cache = Arc::new(leader_schedule_cache);
let exit = Arc::new(AtomicBool::new(false));
@ -108,7 +114,7 @@ impl Validator {
let blocktree = Arc::new(blocktree);
let poh_config = Arc::new(poh_config);
let (poh_recorder, entry_receiver) = PohRecorder::new_with_clear_signal(
let (mut poh_recorder, entry_receiver) = PohRecorder::new_with_clear_signal(
bank.tick_height(),
bank.last_blockhash(),
bank.slot(),
@ -120,6 +126,10 @@ impl Validator {
&leader_schedule_cache,
&poh_config,
);
if config.snapshot_path.is_some() {
poh_recorder.set_bank(&bank);
}
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let poh_service = PohService::new(poh_recorder.clone(), &poh_config, &exit);
assert_eq!(
@ -280,9 +290,40 @@ impl Validator {
}
}
fn get_bank_forks(
genesis_block: &GenesisBlock,
blocktree: &Blocktree,
account_paths: Option<String>,
snapshot_path: Option<String>,
) -> (BankForks, Vec<BankForksInfo>, LeaderScheduleCache) {
if snapshot_path.is_some() {
let bank_forks = BankForks::load_from_snapshot(&snapshot_path);
match bank_forks {
Ok(v) => {
let bank = &v.working_bank();
let fork_info = BankForksInfo {
bank_slot: bank.slot(),
entry_height: bank.tick_height(),
};
return (v, vec![fork_info], LeaderScheduleCache::new_from_bank(bank));
}
Err(_) => warn!("Failed to load from snapshot, fallback to load from ledger"),
}
}
let (mut bank_forks, bank_forks_info, leader_schedule_cache) =
blocktree_processor::process_blocktree(&genesis_block, &blocktree, account_paths)
.expect("process_blocktree failed");
if snapshot_path.is_some() {
bank_forks.set_snapshot_config(snapshot_path);
let _ = bank_forks.add_snapshot(0, 0);
}
(bank_forks, bank_forks_info, leader_schedule_cache)
}
pub fn new_banks_from_blocktree(
blocktree_path: &str,
account_paths: Option<String>,
snapshot_path: Option<String>,
) -> (
BankForks,
Vec<BankForksInfo>,
@ -300,8 +341,7 @@ pub fn new_banks_from_blocktree(
.expect("Expected to successfully open database ledger");
let (bank_forks, bank_forks_info, leader_schedule_cache) =
blocktree_processor::process_blocktree(&genesis_block, &blocktree, account_paths)
.expect("process_blocktree failed");
get_bank_forks(&genesis_block, &blocktree, account_paths, snapshot_path);
(
bank_forks,

View File

@ -1,9 +1,9 @@
use hashbrown::{HashMap, HashSet};
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use rayon::prelude::*;
use solana::cluster_info::{compute_retransmit_peers, ClusterInfo};
use solana::contact_info::ContactInfo;
use solana_sdk::pubkey::Pubkey;
use std::collections::{HashMap, HashSet};
use std::sync::mpsc::channel;
use std::sync::mpsc::TryRecvError;
use std::sync::mpsc::{Receiver, Sender};

View File

@ -1,5 +1,4 @@
use bincode::serialized_size;
use hashbrown::HashMap;
use log::*;
use rayon::prelude::*;
use solana::contact_info::ContactInfo;
@ -11,6 +10,7 @@ use solana::crds_value::CrdsValueLabel;
use solana_sdk::hash::hash;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing::timestamp;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
type Node = Arc<Mutex<CrdsGossip>>;

View File

@ -94,7 +94,7 @@ fn test_replay() {
completed_slots_receiver,
leader_schedule_cache,
_,
) = validator::new_banks_from_blocktree(&blocktree_path, None);
) = validator::new_banks_from_blocktree(&blocktree_path, None, None);
let working_bank = bank_forks.working_bank();
assert_eq!(
working_bank.get_balance(&mint_keypair.pubkey()),

23
run.sh
View File

@ -40,9 +40,24 @@ export RUST_BACKTRACE=1
dataDir=$PWD/target/"$(basename "$0" .sh)"
set -x
solana-keygen -o "$dataDir"/config/leader-keypair.json
solana-keygen -o "$dataDir"/config/leader-vote-account-keypair.json
solana-keygen -o "$dataDir"/config/leader-stake-account-keypair.json
leader_keypair="$dataDir/config/leader-keypair.json"
if [[ -e $leader_keypair ]]; then
echo "Use existing leader keypair"
else
solana-keygen -o "$leader_keypair"
fi
leader_vote_account_keypair="$dataDir/config/leader-vote-account-keypair.json"
if [[ -e $leader_vote_account_keypair ]]; then
echo "Use existing leader vote account keypair"
else
solana-keygen -o "$leader_vote_account_keypair"
fi
leader_stake_account_keypair="$dataDir/config/leader-stake-account-keypair.json"
if [[ -e $leader_stake_account_keypair ]]; then
echo "Use existing leader stake account keypair"
else
solana-keygen -o "$leader_stake_account_keypair"
fi
solana-keygen -o "$dataDir"/config/drone-keypair.json
solana-keygen -o "$dataDir"/config/leader-storage-account-keypair.json
@ -76,6 +91,8 @@ args=(
--gossip-port 8001
--rpc-port 8899
--rpc-drone-address 127.0.0.1:9900
--accounts "$dataDir"/accounts
--snapshot-path "$dataDir"/snapshots
)
if [[ -n $blockstreamSocket ]]; then
args+=(--blockstream "$blockstreamSocket")

View File

@ -0,0 +1,33 @@
#![feature(test)]
extern crate test;
use bincode::serialize;
use solana_runtime::status_cache::*;
use solana_sdk::hash::{hash, Hash};
use solana_sdk::signature::Signature;
use test::Bencher;
type BankStatusCache = StatusCache<()>;
#[bench]
fn test_statuscache_serialize(bencher: &mut Bencher) {
let mut status_cache = BankStatusCache::default();
status_cache.add_root(0);
status_cache.clear_signatures();
for hash_index in 0..100 {
let blockhash = Hash::new(&vec![hash_index; std::mem::size_of::<Hash>()]);
let mut id = blockhash;
for _ in 0..100 {
id = hash(id.as_ref());
let mut sigbytes = Vec::from(id.as_ref());
id = hash(id.as_ref());
sigbytes.extend(id.as_ref());
let sig = Signature::new(&sigbytes);
status_cache.insert(&blockhash, &sig, 0, ());
}
}
bencher.iter(|| {
let _ = serialize(&status_cache).unwrap();
});
}

View File

@ -1,13 +1,13 @@
use crate::accounts_db::{
get_paths_vec, AccountInfo, AccountStorage, AccountsDB, ErrorCounters, InstructionAccounts,
InstructionLoaders,
get_paths_vec, AccountInfo, AccountStorage, AccountsDB, AppendVecId, ErrorCounters,
InstructionAccounts, InstructionLoaders,
};
use crate::accounts_index::{AccountsIndex, Fork};
use crate::append_vec::StoredAccount;
use crate::message_processor::has_duplicates;
use bincode::serialize;
use hashbrown::{HashMap, HashSet};
use log::*;
use serde::{Deserialize, Serialize};
use solana_metrics::inc_new_counter_error;
use solana_sdk::account::Account;
use solana_sdk::fee_calculator::FeeCalculator;
@ -19,6 +19,7 @@ use solana_sdk::system_program;
use solana_sdk::transaction::Result;
use solana_sdk::transaction::{Transaction, TransactionError};
use std::borrow::Borrow;
use std::collections::{HashMap, HashSet};
use std::env;
use std::fs::remove_dir_all;
use std::iter::once;
@ -51,15 +52,18 @@ type RecordLocks = (
);
/// This structure handles synchronization for db
#[derive(Default)]
#[derive(Default, Debug, Serialize, Deserialize)]
pub struct Accounts {
/// Single global AccountsDB
#[serde(skip)]
pub accounts_db: Arc<AccountsDB>,
/// set of accounts which are currently in the pipeline
#[serde(skip)]
account_locks: AccountLocks,
/// set of accounts which are about to record + commit
#[serde(skip)]
record_locks: Mutex<RecordLocks>,
/// List of persistent stores
@ -72,16 +76,16 @@ pub struct Accounts {
impl Drop for Accounts {
fn drop(&mut self) {
let paths = get_paths_vec(&self.paths);
paths.iter().for_each(|p| {
let _ignored = remove_dir_all(p);
if self.own_paths {
let paths = get_paths_vec(&self.paths);
paths.iter().for_each(|p| {
let _ignored = remove_dir_all(p);
// it is safe to delete the parent
if self.own_paths {
// it is safe to delete the parent
let path = Path::new(p);
let _ignored = remove_dir_all(path.parent().unwrap());
}
});
});
}
}
}
@ -330,7 +334,9 @@ impl Accounts {
pub fn load_by_program(&self, fork: Fork, program_id: &Pubkey) -> Vec<(Pubkey, Account)> {
let accumulator: Vec<Vec<(Pubkey, u64, Account)>> = self.accounts_db.scan_account_storage(
fork,
|stored_account: &StoredAccount, accum: &mut Vec<(Pubkey, u64, Account)>| {
|stored_account: &StoredAccount,
_id: AppendVecId,
accum: &mut Vec<(Pubkey, u64, Account)>| {
if stored_account.balance.owner == *program_id {
let val = (
stored_account.meta.pubkey,
@ -441,7 +447,9 @@ impl Accounts {
pub fn hash_internal_state(&self, fork_id: Fork) -> Option<Hash> {
let accumulator: Vec<Vec<(Pubkey, u64, Hash)>> = self.accounts_db.scan_account_storage(
fork_id,
|stored_account: &StoredAccount, accum: &mut Vec<(Pubkey, u64, Hash)>| {
|stored_account: &StoredAccount,
_id: AppendVecId,
accum: &mut Vec<(Pubkey, u64, Hash)>| {
accum.push((
stored_account.meta.pubkey,
stored_account.meta.write_version,
@ -586,11 +594,14 @@ mod tests {
// TODO: all the bank tests are bank specific, issue: 2194
use super::*;
use bincode::{deserialize_from, serialize_into, serialized_size};
use rand::{thread_rng, Rng};
use solana_sdk::account::Account;
use solana_sdk::hash::Hash;
use solana_sdk::instruction::CompiledInstruction;
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::transaction::Transaction;
use std::io::Cursor;
use std::thread::{sleep, Builder};
use std::time::Duration;
@ -1156,4 +1167,51 @@ mod tests {
assert!(parent_record_locks2.is_empty());
}
}
fn create_accounts(accounts: &Accounts, pubkeys: &mut Vec<Pubkey>, num: usize) {
for t in 0..num {
let pubkey = Pubkey::new_rand();
let account = Account::new((t + 1) as u64, 0, &Account::default().owner);
accounts.store_slow(0, &pubkey, &account);
pubkeys.push(pubkey.clone());
}
}
fn check_accounts(accounts: &Accounts, pubkeys: &Vec<Pubkey>, num: usize) {
for _ in 1..num {
let idx = thread_rng().gen_range(0, num - 1);
let ancestors = vec![(0, 0)].into_iter().collect();
let account = accounts.load_slow(&ancestors, &pubkeys[idx]).unwrap();
let account1 = Account::new((idx + 1) as u64, 0, &Account::default().owner);
assert_eq!(account, (account1, 0));
}
}
#[test]
fn test_accounts_serialize() {
solana_logger::setup();
let accounts = Accounts::new(Some("serialize_accounts".to_string()));
let mut pubkeys: Vec<Pubkey> = vec![];
create_accounts(&accounts, &mut pubkeys, 100);
check_accounts(&accounts, &pubkeys, 100);
accounts.add_root(0);
let sz =
serialized_size(&accounts).unwrap() + serialized_size(&*accounts.accounts_db).unwrap();
let mut buf = vec![0u8; sz as usize];
let mut writer = Cursor::new(&mut buf[..]);
serialize_into(&mut writer, &accounts).unwrap();
serialize_into(&mut writer, &*accounts.accounts_db).unwrap();
let mut reader = Cursor::new(&mut buf[..]);
let mut daccounts: Accounts = deserialize_from(&mut reader).unwrap();
let accounts_db: AccountsDB = deserialize_from(&mut reader).unwrap();
daccounts.accounts_db = Arc::new(accounts_db);
check_accounts(&daccounts, &pubkeys, 100);
assert_eq!(
accounts.hash_internal_state(0),
daccounts.hash_internal_state(0)
);
}
}

View File

@ -20,14 +20,20 @@
use crate::accounts_index::{AccountsIndex, Fork};
use crate::append_vec::{AppendVec, StorageMeta, StoredAccount};
use hashbrown::{HashMap, HashSet};
use bincode::{deserialize_from, serialize_into, serialized_size};
use log::*;
use rand::{thread_rng, Rng};
use rayon::prelude::*;
use rayon::ThreadPool;
use serde::de::{MapAccess, Visitor};
use serde::ser::{SerializeMap, Serializer};
use serde::{Deserialize, Serialize};
use solana_sdk::account::Account;
use solana_sdk::pubkey::Pubkey;
use std::collections::{HashMap, HashSet};
use std::fmt;
use std::fs::{create_dir_all, remove_dir_all};
use std::io::Cursor;
use std::path::Path;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
@ -53,7 +59,7 @@ pub struct ErrorCounters {
pub missing_signature_for_fee: usize,
}
#[derive(Default, Clone)]
#[derive(Deserialize, Serialize, Default, Debug, PartialEq, Clone)]
pub struct AccountInfo {
/// index identifying the append storage
id: AppendVecId,
@ -66,18 +72,67 @@ pub struct AccountInfo {
lamports: u64,
}
/// An offset into the AccountsDB::storage vector
type AppendVecId = usize;
pub type AccountStorage = HashMap<usize, Arc<AccountStorageEntry>>;
pub type AppendVecId = usize;
pub type InstructionAccounts = Vec<Account>;
pub type InstructionLoaders = Vec<Vec<(Pubkey, Account)>>;
#[derive(Debug, PartialEq, Clone, Copy)]
#[derive(Default, Debug)]
pub struct AccountStorage(HashMap<usize, Arc<AccountStorageEntry>>);
struct AccountStorageVisitor;
impl<'de> Visitor<'de> for AccountStorageVisitor {
type Value = AccountStorage;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("Expecting AccountStorage")
}
#[allow(clippy::mutex_atomic)]
fn visit_map<M>(self, mut access: M) -> Result<Self::Value, M::Error>
where
M: MapAccess<'de>,
{
let mut map = HashMap::new();
while let Some((key, value)) = access.next_entry()? {
map.insert(key, Arc::new(value));
}
Ok(AccountStorage(map))
}
}
impl Serialize for AccountStorage {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut map = serializer.serialize_map(Some(self.0.len()))?;
for (k, v) in &self.0 {
map.serialize_entry(k, &**v)?;
}
map.end()
}
}
impl<'de> Deserialize<'de> for AccountStorage {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
deserializer.deserialize_map(AccountStorageVisitor)
}
}
#[derive(Debug, PartialEq, Copy, Clone, Deserialize, Serialize)]
pub enum AccountStorageStatus {
StorageAvailable = 0,
StorageFull = 1,
}
/// Persistent storage structure holding the accounts
#[derive(Debug, Deserialize, Serialize)]
pub struct AccountStorageEntry {
id: AppendVecId,
@ -96,7 +151,7 @@ pub struct AccountStorageEntry {
impl AccountStorageEntry {
pub fn new(path: &str, fork_id: Fork, id: usize, file_size: u64) -> Self {
let p = format!("{}/{}", path, id);
let p = format!("{}/{}.{}", path, fork_id, id);
let path = Path::new(&p);
let _ignored = remove_dir_all(path);
create_dir_all(path).expect("Create directory failed");
@ -169,6 +224,7 @@ impl AccountStorageEntry {
}
// This structure handles the load/store of the accounts
#[derive(Debug)]
pub struct AccountsDB {
/// Keeps tracks of index into AppendVec on a per fork basis
pub accounts_index: RwLock<AccountsIndex<AccountInfo>>,
@ -200,7 +256,7 @@ impl Default for AccountsDB {
fn default() -> Self {
AccountsDB {
accounts_index: RwLock::new(AccountsIndex::default()),
storage: RwLock::new(HashMap::new()),
storage: RwLock::new(AccountStorage(HashMap::new())),
next_id: AtomicUsize::new(0),
write_version: AtomicUsize::new(0),
paths: Vec::default(),
@ -218,7 +274,7 @@ impl AccountsDB {
let paths = get_paths_vec(&paths);
AccountsDB {
accounts_index: RwLock::new(AccountsIndex::default()),
storage: RwLock::new(HashMap::new()),
storage: RwLock::new(AccountStorage(HashMap::new())),
next_id: AtomicUsize::new(0),
write_version: AtomicUsize::new(0),
paths,
@ -244,7 +300,7 @@ impl AccountsDB {
}
pub fn has_accounts(&self, fork: Fork) -> bool {
for x in self.storage.read().unwrap().values() {
for x in self.storage.read().unwrap().0.values() {
if x.fork_id == fork && x.count() > 0 {
return true;
}
@ -256,7 +312,7 @@ impl AccountsDB {
// PERF: Sequentially read each storage entry in parallel
pub fn scan_account_storage<F, B>(&self, fork_id: Fork, scan_func: F) -> Vec<B>
where
F: Fn(&StoredAccount, &mut B) -> (),
F: Fn(&StoredAccount, AppendVecId, &mut B) -> (),
F: Send + Sync,
B: Send + Default,
{
@ -264,6 +320,7 @@ impl AccountsDB {
.storage
.read()
.unwrap()
.0
.values()
.filter(|store| store.fork_id == fork_id)
.cloned()
@ -274,9 +331,9 @@ impl AccountsDB {
.map(|storage| {
let accounts = storage.accounts.accounts(0);
let mut retval = B::default();
accounts
.iter()
.for_each(|stored_account| scan_func(stored_account, &mut retval));
accounts.iter().for_each(|stored_account| {
scan_func(stored_account, storage.id, &mut retval)
});
retval
})
.collect()
@ -292,6 +349,7 @@ impl AccountsDB {
let (info, fork) = accounts_index.get(pubkey, ancestors)?;
//TODO: thread this as a ref
storage
.0
.get(&info.id)
.and_then(|store| Some(store.accounts.get_account(info.offset)?.0.clone_account()))
.map(|account| (account, fork))
@ -311,6 +369,7 @@ impl AccountsDB {
let mut candidates: Vec<Arc<AccountStorageEntry>> = {
let stores = self.storage.read().unwrap();
stores
.0
.values()
.filter_map(|x| {
if x.status() == AccountStorageStatus::StorageAvailable && x.fork_id == fork_id
@ -326,7 +385,7 @@ impl AccountsDB {
let mut stores = self.storage.write().unwrap();
let path_index = thread_rng().gen_range(0, self.paths.len());
let storage = Arc::new(self.new_storage_entry(fork_id, &self.paths[path_index]));
stores.insert(storage.id, storage.clone());
stores.0.insert(storage.id, storage.clone());
candidates.push(storage);
}
let rv = thread_rng().gen_range(0, candidates.len());
@ -336,9 +395,8 @@ impl AccountsDB {
pub fn purge_fork(&self, fork: Fork) {
//add_root should be called first
let is_root = self.accounts_index.read().unwrap().is_root(fork);
trace!("PURGING {} {}", fork, is_root);
if !is_root {
self.storage.write().unwrap().retain(|_, v| {
self.storage.write().unwrap().0.retain(|_, v| {
trace!("PURGING {} {}", v.fork_id, fork);
v.fork_id != fork
});
@ -400,7 +458,7 @@ impl AccountsDB {
fn remove_dead_accounts(&self, reclaims: Vec<(Fork, AccountInfo)>) -> HashSet<Fork> {
let storage = self.storage.read().unwrap();
for (fork_id, account_info) in reclaims {
if let Some(store) = storage.get(&account_info.id) {
if let Some(store) = storage.0.get(&account_info.id) {
assert_eq!(
fork_id, store.fork_id,
"AccountDB::accounts_index corrupted. Storage should only point to one fork"
@ -410,6 +468,7 @@ impl AccountsDB {
}
//TODO: performance here could be improved if AccountsDB::storage was organized by fork
let dead_forks: HashSet<Fork> = storage
.0
.values()
.filter_map(|x| {
if x.count() == 0 {
@ -420,6 +479,7 @@ impl AccountsDB {
})
.collect();
let live_forks: HashSet<Fork> = storage
.0
.values()
.filter_map(|x| if x.count() > 0 { Some(x.fork_id) } else { None })
.collect();
@ -451,12 +511,147 @@ impl AccountsDB {
pub fn add_root(&self, fork: Fork) {
self.accounts_index.write().unwrap().add_root(fork)
}
fn merge(
dest: &mut HashMap<Pubkey, (u64, AccountInfo)>,
source: &HashMap<Pubkey, (u64, AccountInfo)>,
) {
for (key, (source_version, source_info)) in source.iter() {
if let Some((dest_version, _)) = dest.get(key) {
if dest_version > source_version {
continue;
}
}
dest.insert(*key, (*source_version, source_info.clone()));
}
}
fn generate_index(&mut self) {
let mut forks: Vec<Fork> = self
.storage
.read()
.unwrap()
.0
.values()
.map(|x| x.fork_id)
.collect();
forks.sort();
for fork_id in forks.iter() {
let mut accumulator: Vec<HashMap<Pubkey, (u64, AccountInfo)>> = self
.scan_account_storage(
*fork_id,
|stored_account: &StoredAccount,
id: AppendVecId,
accum: &mut HashMap<Pubkey, (u64, AccountInfo)>| {
let account_info = AccountInfo {
id,
offset: stored_account.offset,
lamports: stored_account.balance.lamports,
};
accum.insert(
stored_account.meta.pubkey,
(stored_account.meta.write_version, account_info),
);
},
);
let mut account_maps = accumulator.pop().unwrap();
while let Some(maps) = accumulator.pop() {
AccountsDB::merge(&mut account_maps, &maps);
}
let mut accounts_index = self.accounts_index.write().unwrap();
for (pubkey, (_, account_info)) in account_maps.iter() {
accounts_index.add_index(*fork_id, pubkey, account_info.clone());
}
}
}
}
impl Serialize for AccountsDB {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::ser::Serializer,
{
use serde::ser::Error;
let len = serialized_size(&self.accounts_index).unwrap()
+ serialized_size(&self.paths).unwrap()
+ serialized_size(&self.storage).unwrap()
+ std::mem::size_of::<u64>() as u64
+ serialized_size(&self.file_size).unwrap();
let mut buf = vec![0u8; len as usize];
let mut wr = Cursor::new(&mut buf[..]);
serialize_into(&mut wr, &self.accounts_index).map_err(Error::custom)?;
serialize_into(&mut wr, &self.paths).map_err(Error::custom)?;
serialize_into(&mut wr, &self.storage).map_err(Error::custom)?;
serialize_into(
&mut wr,
&(self.write_version.load(Ordering::Relaxed) as u64),
)
.map_err(Error::custom)?;
serialize_into(&mut wr, &self.file_size).map_err(Error::custom)?;
let len = wr.position() as usize;
serializer.serialize_bytes(&wr.into_inner()[..len])
}
}
struct AccountsDBVisitor;
impl<'a> serde::de::Visitor<'a> for AccountsDBVisitor {
type Value = AccountsDB;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("Expecting AccountsDB")
}
fn visit_bytes<E>(self, data: &[u8]) -> std::result::Result<Self::Value, E>
where
E: serde::de::Error,
{
use serde::de::Error;
let mut rd = Cursor::new(&data[..]);
let accounts_index: RwLock<AccountsIndex<AccountInfo>> =
deserialize_from(&mut rd).map_err(Error::custom)?;
let paths: Vec<String> = deserialize_from(&mut rd).map_err(Error::custom)?;
let storage: RwLock<AccountStorage> = deserialize_from(&mut rd).map_err(Error::custom)?;
let write_version: u64 = deserialize_from(&mut rd).map_err(Error::custom)?;
let file_size: u64 = deserialize_from(&mut rd).map_err(Error::custom)?;
let mut ids: Vec<usize> = storage.read().unwrap().0.keys().cloned().collect();
ids.sort();
let mut accounts_db = AccountsDB {
accounts_index,
storage,
next_id: AtomicUsize::new(ids[ids.len() - 1] + 1),
write_version: AtomicUsize::new(write_version as usize),
paths,
file_size,
thread_pool: rayon::ThreadPoolBuilder::new()
.num_threads(2)
.build()
.unwrap(),
};
accounts_db.generate_index();
Ok(accounts_db)
}
}
impl<'de> Deserialize<'de> for AccountsDB {
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: ::serde::Deserializer<'de>,
{
deserializer.deserialize_bytes(AccountsDBVisitor)
}
}
#[cfg(test)]
mod tests {
// TODO: all the bank tests are bank specific, issue: 2194
use super::*;
use bincode::{deserialize_from, serialize_into, serialized_size};
use rand::{thread_rng, Rng};
use solana_sdk::account::Account;
@ -654,16 +849,16 @@ mod tests {
db.store(1, &[(&pubkeys[0], &account)]);
{
let stores = db.storage.read().unwrap();
assert_eq!(stores.len(), 2);
assert_eq!(stores[&0].count(), 2);
assert_eq!(stores[&1].count(), 2);
assert_eq!(stores.0.len(), 2);
assert_eq!(stores.0[&0].count(), 2);
assert_eq!(stores.0[&1].count(), 2);
}
db.add_root(1);
{
let stores = db.storage.read().unwrap();
assert_eq!(stores.len(), 2);
assert_eq!(stores[&0].count(), 2);
assert_eq!(stores[&1].count(), 2);
assert_eq!(stores.0.len(), 2);
assert_eq!(stores.0[&0].count(), 2);
assert_eq!(stores.0[&1].count(), 2);
}
}
@ -736,19 +931,40 @@ mod tests {
fn check_storage(accounts: &AccountsDB, count: usize) -> bool {
let stores = accounts.storage.read().unwrap();
assert_eq!(stores.len(), 1);
assert_eq!(stores[&0].status(), AccountStorageStatus::StorageAvailable);
stores[&0].count() == count
assert_eq!(stores.0.len(), 1);
assert_eq!(
stores.0[&0].status(),
AccountStorageStatus::StorageAvailable
);
stores.0[&0].count() == count
}
fn check_accounts(accounts: &AccountsDB, pubkeys: &Vec<Pubkey>, fork: Fork) {
for _ in 1..100 {
let idx = thread_rng().gen_range(0, 99);
fn check_accounts(
accounts: &AccountsDB,
pubkeys: &Vec<Pubkey>,
fork: Fork,
num: usize,
count: usize,
) {
for _ in 1..num {
let idx = thread_rng().gen_range(0, num - 1);
let ancestors = vec![(fork, 0)].into_iter().collect();
let account = accounts.load_slow(&ancestors, &pubkeys[idx]).unwrap();
let mut default_account = Account::default();
default_account.lamports = (idx + 1) as u64;
assert_eq!((default_account, 0), account);
let account1 = Account::new((idx + count) as u64, 0, &Account::default().owner);
assert_eq!(account, (account1, fork));
}
}
fn modify_accounts(
accounts: &AccountsDB,
pubkeys: &Vec<Pubkey>,
fork: Fork,
num: usize,
count: usize,
) {
for idx in 0..num {
let account = Account::new((idx + count) as u64, 0, &Account::default().owner);
accounts.store(fork, &[(&pubkeys[idx], &account)]);
}
}
@ -771,7 +987,7 @@ mod tests {
let accounts = AccountsDB::new(&paths.paths);
let mut pubkeys: Vec<Pubkey> = vec![];
create_account(&accounts, &mut pubkeys, 0, 100, 0, 0);
check_accounts(&accounts, &pubkeys, 0);
check_accounts(&accounts, &pubkeys, 0, 100, 1);
}
#[test]
@ -805,7 +1021,7 @@ mod tests {
}
let mut append_vec_histogram = HashMap::new();
for storage in accounts.storage.read().unwrap().values() {
for storage in accounts.storage.read().unwrap().0.values() {
*append_vec_histogram.entry(storage.fork_id).or_insert(0) += 1;
}
for count in append_vec_histogram.values() {
@ -827,9 +1043,12 @@ mod tests {
accounts.store(0, &[(&pubkey1, &account1)]);
{
let stores = accounts.storage.read().unwrap();
assert_eq!(stores.len(), 1);
assert_eq!(stores[&0].count(), 1);
assert_eq!(stores[&0].status(), AccountStorageStatus::StorageAvailable);
assert_eq!(stores.0.len(), 1);
assert_eq!(stores.0[&0].count(), 1);
assert_eq!(
stores.0[&0].status(),
AccountStorageStatus::StorageAvailable
);
}
let pubkey2 = Pubkey::new_rand();
@ -837,11 +1056,14 @@ mod tests {
accounts.store(0, &[(&pubkey2, &account2)]);
{
let stores = accounts.storage.read().unwrap();
assert_eq!(stores.len(), 2);
assert_eq!(stores[&0].count(), 1);
assert_eq!(stores[&0].status(), AccountStorageStatus::StorageFull);
assert_eq!(stores[&1].count(), 1);
assert_eq!(stores[&1].status(), AccountStorageStatus::StorageAvailable);
assert_eq!(stores.0.len(), 2);
assert_eq!(stores.0[&0].count(), 1);
assert_eq!(stores.0[&0].status(), AccountStorageStatus::StorageFull);
assert_eq!(stores.0[&1].count(), 1);
assert_eq!(
stores.0[&1].status(),
AccountStorageStatus::StorageAvailable
);
}
let ancestors = vec![(0, 0)].into_iter().collect();
assert_eq!(
@ -859,13 +1081,13 @@ mod tests {
accounts.store(0, &[(&pubkey1, &account1)]);
{
let stores = accounts.storage.read().unwrap();
assert_eq!(stores.len(), 3);
assert_eq!(stores[&0].count(), count[index]);
assert_eq!(stores[&0].status(), status[0]);
assert_eq!(stores[&1].count(), 1);
assert_eq!(stores[&1].status(), status[1]);
assert_eq!(stores[&2].count(), count[index ^ 1]);
assert_eq!(stores[&2].status(), status[0]);
assert_eq!(stores.0.len(), 3);
assert_eq!(stores.0[&0].count(), count[index]);
assert_eq!(stores.0[&0].status(), status[0]);
assert_eq!(stores.0[&1].count(), 1);
assert_eq!(stores.0[&1].status(), status[1]);
assert_eq!(stores.0[&2].count(), count[index ^ 1]);
assert_eq!(stores.0[&2].status(), status[0]);
}
let ancestors = vec![(0, 0)].into_iter().collect();
assert_eq!(
@ -928,17 +1150,42 @@ mod tests {
assert!(accounts.accounts_index.read().unwrap().is_purged(0));
//fork is still there, since gc is lazy
assert!(accounts.storage.read().unwrap().get(&info.id).is_some());
assert!(accounts.storage.read().unwrap().0.get(&info.id).is_some());
//store causes cleanup
accounts.store(1, &[(&pubkey, &account)]);
//fork is gone
assert!(accounts.storage.read().unwrap().get(&info.id).is_none());
assert!(accounts.storage.read().unwrap().0.get(&info.id).is_none());
//new value is there
let ancestors = vec![(1, 1)].into_iter().collect();
assert_eq!(accounts.load_slow(&ancestors, &pubkey), Some((account, 1)));
}
#[test]
fn test_accounts_db_serialize() {
solana_logger::setup();
let paths = get_tmp_accounts_path!();
let accounts = AccountsDB::new(&paths.paths);
let mut pubkeys: Vec<Pubkey> = vec![];
create_account(&accounts, &mut pubkeys, 0, 100, 0, 0);
assert_eq!(check_storage(&accounts, 100), true);
check_accounts(&accounts, &pubkeys, 0, 100, 1);
modify_accounts(&accounts, &pubkeys, 0, 100, 2);
check_accounts(&accounts, &pubkeys, 0, 100, 2);
accounts.add_root(0);
let mut pubkeys1: Vec<Pubkey> = vec![];
create_account(&accounts, &mut pubkeys1, 1, 10, 0, 0);
let mut buf = vec![0u8; serialized_size(&accounts).unwrap() as usize];
let mut writer = Cursor::new(&mut buf[..]);
serialize_into(&mut writer, &accounts).unwrap();
let mut reader = Cursor::new(&mut buf[..]);
let daccounts: AccountsDB = deserialize_from(&mut reader).unwrap();
check_accounts(&daccounts, &pubkeys, 0, 100, 2);
check_accounts(&daccounts, &pubkeys1, 1, 10, 1);
}
}

View File

@ -1,13 +1,17 @@
use hashbrown::{HashMap, HashSet};
use log::*;
use serde::{Deserialize, Serialize};
use solana_sdk::pubkey::Pubkey;
use std::collections::{HashMap, HashSet};
pub type Fork = u64;
#[derive(Default)]
#[derive(Debug, Default, Deserialize, Serialize)]
pub struct AccountsIndex<T> {
account_maps: HashMap<Pubkey, Vec<(Fork, T)>>,
roots: HashSet<Fork>,
#[serde(skip)]
pub account_maps: HashMap<Pubkey, Vec<(Fork, T)>>,
pub roots: HashSet<Fork>,
//This value that needs to be stored to recover the index from AppendVec
pub last_root: Fork,
}
@ -35,7 +39,7 @@ impl<T: Clone> AccountsIndex<T> {
let mut rv = vec![];
let mut fork_vec: Vec<(Fork, T)> = vec![];
{
let entry = self.account_maps.entry(*pubkey).or_insert(vec![]);
let entry = self.account_maps.entry(*pubkey).or_insert_with(|| vec![]);
std::mem::swap(entry, &mut fork_vec);
};
@ -54,11 +58,17 @@ impl<T: Clone> AccountsIndex<T> {
);
fork_vec.retain(|(fork, _)| !self.is_purged(*fork));
{
let entry = self.account_maps.entry(*pubkey).or_insert(vec![]);
let entry = self.account_maps.entry(*pubkey).or_insert_with(|| vec![]);
std::mem::swap(entry, &mut fork_vec);
};
rv
}
pub fn add_index(&mut self, fork: Fork, pubkey: &Pubkey, account_info: T) {
let entry = self.account_maps.entry(*pubkey).or_insert_with(|| vec![]);
entry.push((fork, account_info));
}
pub fn is_purged(&self, fork: Fork) -> bool {
fork < self.last_root
}

View File

@ -1,10 +1,13 @@
use bincode::{deserialize_from, serialize_into, serialized_size};
use memmap::MmapMut;
use serde::{Deserialize, Serialize};
use solana_sdk::account::Account;
use solana_sdk::pubkey::Pubkey;
use std::fmt;
use std::fs::OpenOptions;
use std::io::{Seek, SeekFrom, Write};
use std::io::{Cursor, Seek, SeekFrom, Write};
use std::mem;
use std::path::Path;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Mutex;
@ -26,7 +29,7 @@ pub struct StorageMeta {
pub data_len: u64,
}
#[derive(Serialize, Deserialize, Clone, Default, Eq, PartialEq)]
#[derive(Serialize, Deserialize, Clone, Debug, Default, Eq, PartialEq)]
pub struct AccountBalance {
/// lamports in the account
pub lamports: u64,
@ -38,11 +41,13 @@ pub struct AccountBalance {
/// References to Memory Mapped memory
/// The Account is stored separately from its data, so getting the actual account requires a clone
#[derive(PartialEq, Debug)]
pub struct StoredAccount<'a> {
pub meta: &'a StorageMeta,
/// account data
pub balance: &'a AccountBalance,
pub data: &'a [u8],
pub offset: usize,
}
impl<'a> StoredAccount<'a> {
@ -56,7 +61,10 @@ impl<'a> StoredAccount<'a> {
}
}
#[derive(Debug)]
#[allow(clippy::mutex_atomic)]
pub struct AppendVec {
path: PathBuf,
map: MmapMut,
// This mutex forces append to be single threaded, but concurrent with reads
append_offset: Mutex<usize>,
@ -64,6 +72,12 @@ pub struct AppendVec {
file_size: u64,
}
impl Drop for AppendVec {
fn drop(&mut self) {
let _ignored = std::fs::remove_dir_all(&self.path.parent().unwrap());
}
}
impl AppendVec {
#[allow(clippy::mutex_atomic)]
pub fn new(file: &Path, create: bool, size: usize) -> Self {
@ -82,6 +96,7 @@ impl AppendVec {
let map = unsafe { MmapMut::map_mut(&data).expect("failed to map the data file") };
AppendVec {
path: file.to_path_buf(),
map,
// This mutex forces append to be single threaded, but concurrent with reads
// See UNSAFE usage in `append_ptr`
@ -184,6 +199,7 @@ impl AppendVec {
meta,
balance,
data,
offset,
},
next,
))
@ -259,13 +275,13 @@ pub mod test_utils {
fn drop(&mut self) {
let mut path = PathBuf::new();
std::mem::swap(&mut path, &mut self.path);
let _ = std::fs::remove_file(path);
let _ignored = std::fs::remove_file(path);
}
}
pub fn get_append_vec_path(path: &str) -> TempFile {
let out_dir =
std::env::var("OUT_DIR").unwrap_or_else(|_| "/tmp/append_vec_tests".to_string());
std::env::var("OUT_DIR").unwrap_or_else(|_| "target/append_vec_tests".to_string());
let mut buf = PathBuf::new();
let rand_string: String = thread_rng().sample_iter(&Alphanumeric).take(30).collect();
buf.push(&format!("{}/{}{}", out_dir, path, rand_string));
@ -286,6 +302,82 @@ pub mod test_utils {
}
}
#[allow(clippy::mutex_atomic)]
impl Serialize for AppendVec {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::ser::Serializer,
{
use serde::ser::Error;
let len = serialized_size(&self.path).unwrap()
+ std::mem::size_of::<u64>() as u64
+ std::mem::size_of::<u64>() as u64
+ std::mem::size_of::<usize>() as u64;
let mut buf = vec![0u8; len as usize];
let mut wr = Cursor::new(&mut buf[..]);
serialize_into(&mut wr, &self.path).map_err(Error::custom)?;
serialize_into(&mut wr, &(self.current_len.load(Ordering::Relaxed) as u64))
.map_err(Error::custom)?;
serialize_into(&mut wr, &self.file_size).map_err(Error::custom)?;
let offset = *self.append_offset.lock().unwrap();
serialize_into(&mut wr, &offset).map_err(Error::custom)?;
let len = wr.position() as usize;
serializer.serialize_bytes(&wr.into_inner()[..len])
}
}
struct AppendVecVisitor;
impl<'a> serde::de::Visitor<'a> for AppendVecVisitor {
type Value = AppendVec;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("Expecting AppendVec")
}
#[allow(clippy::mutex_atomic)]
fn visit_bytes<E>(self, data: &[u8]) -> std::result::Result<Self::Value, E>
where
E: serde::de::Error,
{
use serde::de::Error;
let mut rd = Cursor::new(&data[..]);
let path: PathBuf = deserialize_from(&mut rd).map_err(Error::custom)?;
let current_len: u64 = deserialize_from(&mut rd).map_err(Error::custom)?;
let file_size: u64 = deserialize_from(&mut rd).map_err(Error::custom)?;
let offset: usize = deserialize_from(&mut rd).map_err(Error::custom)?;
let data = OpenOptions::new()
.read(true)
.write(true)
.create(false)
.open(path.as_path());
if data.is_err() {
std::fs::create_dir_all(&path.parent().unwrap()).expect("Create directory failed");
return Ok(AppendVec::new(&path, true, file_size as usize));
}
let map = unsafe { MmapMut::map_mut(&data.unwrap()).expect("failed to map the data file") };
Ok(AppendVec {
path,
map,
append_offset: Mutex::new(offset),
current_len: AtomicUsize::new(current_len as usize),
file_size,
})
}
}
impl<'de> Deserialize<'de> for AppendVec {
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: ::serde::Deserializer<'de>,
{
deserializer.deserialize_bytes(AppendVecVisitor)
}
}
#[cfg(test)]
pub mod tests {
use super::test_utils::*;
@ -355,4 +447,30 @@ pub mod tests {
duration_as_ms(&now.elapsed()),
);
}
#[test]
fn test_append_vec_serialize() {
let path = Path::new("append_vec_serialize");
let av: AppendVec = AppendVec::new(path, true, 1024 * 1024);
let account1 = create_test_account(1);
let index1 = av.append_account_test(&account1).unwrap();
assert_eq!(index1, 0);
assert_eq!(av.get_account_test(index1).unwrap(), account1);
let account2 = create_test_account(2);
let index2 = av.append_account_test(&account2).unwrap();
assert_eq!(av.get_account_test(index2).unwrap(), account2);
assert_eq!(av.get_account_test(index1).unwrap(), account1);
let mut buf = vec![0u8; serialized_size(&av).unwrap() as usize];
let mut writer = Cursor::new(&mut buf[..]);
serialize_into(&mut writer, &av).unwrap();
let mut reader = Cursor::new(&mut buf[..]);
let dav: AppendVec = deserialize_from(&mut reader).unwrap();
assert_eq!(dav.get_account_test(index2).unwrap(), account2);
assert_eq!(dav.get_account_test(index1).unwrap(), account1);
std::fs::remove_file(path).unwrap();
}
}

View File

@ -3,17 +3,20 @@
//! on behalf of the caller, and a low-level API for when they have
//! already been signed and verified.
use crate::accounts::{AccountLockType, Accounts};
use crate::accounts_db::{ErrorCounters, InstructionAccounts, InstructionLoaders};
use crate::accounts_db::{AccountsDB, ErrorCounters, InstructionAccounts, InstructionLoaders};
use crate::accounts_index::Fork;
use crate::blockhash_queue::BlockhashQueue;
use crate::epoch_schedule::EpochSchedule;
use crate::locked_accounts_results::LockedAccountsResults;
use crate::message_processor::{MessageProcessor, ProcessInstruction};
use crate::serde_utils::{
deserialize_atomicbool, deserialize_atomicusize, serialize_atomicbool, serialize_atomicusize,
};
use crate::stakes::Stakes;
use crate::status_cache::StatusCache;
use bincode::serialize;
use hashbrown::HashMap;
use bincode::{deserialize_from, serialize, serialize_into, serialized_size};
use log::*;
use serde::{Deserialize, Serialize};
use solana_metrics::{
datapoint_info, inc_new_counter_debug, inc_new_counter_error, inc_new_counter_info,
};
@ -31,27 +34,155 @@ use solana_sdk::timing::{duration_as_ms, duration_as_us, MAX_RECENT_BLOCKHASHES}
use solana_sdk::transaction::{Result, Transaction, TransactionError};
use std::borrow::Borrow;
use std::cmp;
use std::collections::HashMap;
use std::fmt;
use std::io::Cursor;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, RwLock, RwLockReadGuard};
use std::time::Instant;
type BankStatusCache = StatusCache<Result<()>>;
/// Manager for the state of all accounts and programs after processing its entries.
#[derive(Default)]
pub struct Bank {
pub struct BankRc {
/// where all the Accounts are stored
accounts: Arc<Accounts>,
/// Previous checkpoint of this bank
parent: RwLock<Option<Arc<Bank>>>,
}
impl Serialize for BankRc {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::ser::Serializer,
{
use serde::ser::Error;
let len = serialized_size(&*self.accounts.accounts_db).unwrap()
+ serialized_size(&*self.accounts).unwrap();
let mut buf = vec![0u8; len as usize];
let mut wr = Cursor::new(&mut buf[..]);
serialize_into(&mut wr, &*self.accounts).map_err(Error::custom)?;
serialize_into(&mut wr, &*self.accounts.accounts_db).map_err(Error::custom)?;
let len = wr.position() as usize;
serializer.serialize_bytes(&wr.into_inner()[..len])
}
}
struct BankRcVisitor;
impl<'a> serde::de::Visitor<'a> for BankRcVisitor {
type Value = BankRc;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("Expecting BankRc")
}
#[allow(clippy::mutex_atomic)]
fn visit_bytes<E>(self, data: &[u8]) -> std::result::Result<Self::Value, E>
where
E: serde::de::Error,
{
use serde::de::Error;
let mut rd = Cursor::new(&data[..]);
let mut accounts: Accounts = deserialize_from(&mut rd).map_err(Error::custom)?;
let accounts_db: AccountsDB = deserialize_from(&mut rd).map_err(Error::custom)?;
accounts.accounts_db = Arc::new(accounts_db);
Ok(BankRc {
accounts: Arc::new(accounts),
parent: RwLock::new(None),
})
}
}
impl<'de> Deserialize<'de> for BankRc {
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: ::serde::Deserializer<'de>,
{
deserializer.deserialize_bytes(BankRcVisitor)
}
}
#[derive(Default)]
pub struct StatusCacheRc {
/// where all the Accounts are stored
/// A cache of signature statuses
status_cache: Arc<RwLock<BankStatusCache>>,
}
impl Serialize for StatusCacheRc {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::ser::Serializer,
{
use serde::ser::Error;
let len = serialized_size(&*self.status_cache).unwrap();
let mut buf = vec![0u8; len as usize];
let mut wr = Cursor::new(&mut buf[..]);
{
let mut status_cache = self.status_cache.write().unwrap();
serialize_into(&mut wr, &*status_cache).map_err(Error::custom)?;
status_cache.merge_caches();
}
let len = wr.position() as usize;
serializer.serialize_bytes(&wr.into_inner()[..len])
}
}
struct StatusCacheRcVisitor;
impl<'a> serde::de::Visitor<'a> for StatusCacheRcVisitor {
type Value = StatusCacheRc;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("Expecting StatusCacheRc")
}
#[allow(clippy::mutex_atomic)]
fn visit_bytes<E>(self, data: &[u8]) -> std::result::Result<Self::Value, E>
where
E: serde::de::Error,
{
use serde::de::Error;
let mut rd = Cursor::new(&data[..]);
let status_cache: BankStatusCache = deserialize_from(&mut rd).map_err(Error::custom)?;
Ok(StatusCacheRc {
status_cache: Arc::new(RwLock::new(status_cache)),
})
}
}
impl<'de> Deserialize<'de> for StatusCacheRc {
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: ::serde::Deserializer<'de>,
{
deserializer.deserialize_bytes(StatusCacheRcVisitor)
}
}
impl StatusCacheRc {
pub fn append(&self, status_cache_rc: &StatusCacheRc) {
let sc = status_cache_rc.status_cache.write().unwrap();
self.status_cache.write().unwrap().append(&sc);
}
}
/// Manager for the state of all accounts and programs after processing its entries.
#[derive(Default, Deserialize, Serialize)]
pub struct Bank {
/// References to accounts, parent and signature status
#[serde(skip)]
pub rc: BankRc,
#[serde(skip)]
pub src: StatusCacheRc,
/// FIFO queue of `recent_blockhash` items
blockhash_queue: RwLock<BlockhashQueue>,
/// Previous checkpoint of this bank
parent: RwLock<Option<Arc<Bank>>>,
/// The set of parents including this bank
pub ancestors: HashMap<u64, usize>,
@ -62,9 +193,13 @@ pub struct Bank {
parent_hash: Hash,
/// The number of transactions processed without error
#[serde(serialize_with = "serialize_atomicusize")]
#[serde(deserialize_with = "deserialize_atomicusize")]
transaction_count: AtomicUsize, // TODO: Use AtomicU64 if/when available
/// Bank tick height
#[serde(serialize_with = "serialize_atomicusize")]
#[serde(deserialize_with = "deserialize_atomicusize")]
tick_height: AtomicUsize, // TODO: Use AtomicU64 if/when available
// Bank max_tick_height
@ -97,6 +232,8 @@ pub struct Bank {
/// A boolean reflecting whether any entries were recorded into the PoH
/// stream for the slot == self.slot
#[serde(serialize_with = "serialize_atomicbool")]
#[serde(deserialize_with = "deserialize_atomicbool")]
is_delta: AtomicBool,
/// The Message processor
@ -117,7 +254,7 @@ impl Bank {
pub fn new_with_paths(genesis_block: &GenesisBlock, paths: Option<String>) -> Self {
let mut bank = Self::default();
bank.ancestors.insert(bank.slot(), 0);
bank.accounts = Arc::new(Accounts::new(paths));
bank.rc.accounts = Arc::new(Accounts::new(paths));
bank.process_genesis_block(genesis_block);
// genesis needs stakes for all epochs up to the epoch implied by
// slot = 0 and genesis configuration
@ -137,7 +274,7 @@ impl Bank {
let mut bank = Self::default();
bank.blockhash_queue = RwLock::new(parent.blockhash_queue.read().unwrap().clone());
bank.status_cache = parent.status_cache.clone();
bank.src.status_cache = parent.src.status_cache.clone();
bank.bank_height = parent.bank_height + 1;
bank.fee_calculator = parent.fee_calculator.clone();
@ -159,11 +296,11 @@ impl Bank {
("bank_height", bank.bank_height, i64)
);
bank.parent = RwLock::new(Some(parent.clone()));
bank.rc.parent = RwLock::new(Some(parent.clone()));
bank.parent_hash = parent.hash();
bank.collector_id = *collector_id;
bank.accounts = Arc::new(Accounts::new_from_parent(&parent.accounts));
bank.rc.accounts = Arc::new(Accounts::new_from_parent(&parent.rc.accounts));
bank.epoch_stakes = {
let mut epoch_stakes = parent.epoch_stakes.clone();
@ -256,19 +393,19 @@ impl Bank {
self.freeze();
let parents = self.parents();
*self.parent.write().unwrap() = None;
*self.rc.parent.write().unwrap() = None;
let squash_accounts_start = Instant::now();
for p in parents.iter().rev() {
// root forks cannot be purged
self.accounts.add_root(p.slot());
self.rc.accounts.add_root(p.slot());
}
let squash_accounts_ms = duration_as_ms(&squash_accounts_start.elapsed());
let squash_cache_start = Instant::now();
parents
.iter()
.for_each(|p| self.status_cache.write().unwrap().add_root(p.slot()));
.for_each(|p| self.src.status_cache.write().unwrap().add_root(p.slot()));
let squash_cache_ms = duration_as_ms(&squash_cache_start.elapsed());
datapoint_info!(
@ -280,7 +417,7 @@ impl Bank {
/// Return the more recent checkpoint of this bank instance.
pub fn parent(&self) -> Option<Arc<Bank>> {
self.parent.read().unwrap().clone()
self.rc.parent.read().unwrap().clone()
}
fn process_genesis_block(&mut self, genesis_block: &GenesisBlock) {
@ -356,7 +493,7 @@ impl Bank {
/// Forget all signatures. Useful for benchmarking.
pub fn clear_signatures(&self) {
self.status_cache.write().unwrap().clear_signatures();
self.src.status_cache.write().unwrap().clear_signatures();
}
pub fn can_commit(result: &Result<()>) -> bool {
@ -368,7 +505,7 @@ impl Bank {
}
fn update_transaction_statuses(&self, txs: &[Transaction], res: &[Result<()>]) {
let mut status_cache = self.status_cache.write().unwrap();
let mut status_cache = self.src.status_cache.write().unwrap();
for (i, tx) in txs.iter().enumerate() {
if Self::can_commit(&res[i]) && !tx.signatures.is_empty() {
status_cache.insert(
@ -453,7 +590,7 @@ impl Bank {
}
// TODO: put this assert back in
// assert!(!self.is_frozen());
let results = self.accounts.lock_accounts(txs);
let results = self.rc.accounts.lock_accounts(txs);
LockedAccountsResults::new(results, &self, txs, AccountLockType::AccountLock)
}
@ -464,11 +601,12 @@ impl Bank {
if locked_accounts_results.needs_unlock {
locked_accounts_results.needs_unlock = false;
match locked_accounts_results.lock_type() {
AccountLockType::AccountLock => self.accounts.unlock_accounts(
AccountLockType::AccountLock => self.rc.accounts.unlock_accounts(
locked_accounts_results.transactions(),
locked_accounts_results.locked_accounts_results(),
),
AccountLockType::RecordLock => self
.rc
.accounts
.unlock_record_accounts(locked_accounts_results.transactions()),
}
@ -482,12 +620,12 @@ impl Bank {
where
I: std::borrow::Borrow<Transaction>,
{
self.accounts.lock_record_accounts(txs);
self.rc.accounts.lock_record_accounts(txs);
LockedAccountsResults::new(vec![], &self, txs, AccountLockType::RecordLock)
}
pub fn unlock_record_accounts(&self, txs: &[Transaction]) {
self.accounts.unlock_record_accounts(txs)
self.rc.accounts.unlock_record_accounts(txs)
}
fn load_accounts(
@ -496,7 +634,7 @@ impl Bank {
results: Vec<Result<()>>,
error_counters: &mut ErrorCounters,
) -> Vec<Result<(InstructionAccounts, InstructionLoaders)>> {
self.accounts.load_accounts(
self.rc.accounts.load_accounts(
&self.ancestors,
txs,
results,
@ -550,7 +688,7 @@ impl Bank {
lock_results: Vec<Result<()>>,
error_counters: &mut ErrorCounters,
) -> Vec<Result<()>> {
let rcache = self.status_cache.read().unwrap();
let rcache = self.src.status_cache.read().unwrap();
txs.iter()
.zip(lock_results.into_iter())
.map(|(tx, lock_res)| {
@ -770,7 +908,8 @@ impl Bank {
// TODO: put this assert back in
// assert!(!self.is_frozen());
let now = Instant::now();
self.accounts
self.rc
.accounts
.store_accounts(self.slot(), txs, executed, loaded_accounts);
self.store_stakes(txs, executed, loaded_accounts);
@ -838,7 +977,7 @@ impl Bank {
}
fn store(&self, pubkey: &Pubkey, account: &Account) {
self.accounts.store_slow(self.slot(), pubkey, account);
self.rc.accounts.store_slow(self.slot(), pubkey, account);
if Stakes::is_stake(account) {
self.stakes.write().unwrap().store(pubkey, account);
@ -867,8 +1006,22 @@ impl Bank {
self.store(pubkey, &account);
}
pub fn accounts(&self) -> Arc<Accounts> {
self.rc.accounts.clone()
}
pub fn set_bank_rc(&mut self, bank_rc: &BankRc, status_cache_rc: &StatusCacheRc) {
self.rc.accounts = bank_rc.accounts.clone();
self.src.status_cache = status_cache_rc.status_cache.clone()
}
pub fn set_parent(&mut self, parent: &Arc<Bank>) {
self.rc.parent = RwLock::new(Some(parent.clone()));
}
pub fn get_account(&self, pubkey: &Pubkey) -> Option<Account> {
self.accounts
self.rc
.accounts
.load_slow(&self.ancestors, pubkey)
.map(|(account, _)| account)
}
@ -877,12 +1030,12 @@ impl Bank {
&self,
program_id: &Pubkey,
) -> Vec<(Pubkey, Account)> {
self.accounts.load_by_program(self.slot(), program_id)
self.rc.accounts.load_by_program(self.slot(), program_id)
}
pub fn get_account_modified_since_parent(&self, pubkey: &Pubkey) -> Option<(Account, Fork)> {
let just_self: HashMap<u64, usize> = vec![(self.slot(), 0)].into_iter().collect();
self.accounts.load_slow(&just_self, pubkey)
self.rc.accounts.load_slow(&just_self, pubkey)
}
pub fn transaction_count(&self) -> u64 {
@ -897,7 +1050,7 @@ impl Bank {
&self,
signature: &Signature,
) -> Option<(usize, Result<()>)> {
let rcache = self.status_cache.read().unwrap();
let rcache = self.src.status_cache.read().unwrap();
rcache.get_signature_status_slow(signature, &self.ancestors)
}
@ -915,11 +1068,11 @@ impl Bank {
fn hash_internal_state(&self) -> Hash {
// If there are no accounts, return the same hash as we did before
// checkpointing.
if !self.accounts.has_accounts(self.slot()) {
if !self.rc.accounts.has_accounts(self.slot()) {
return self.parent_hash;
}
let accounts_delta_hash = self.accounts.hash_internal_state(self.slot());
let accounts_delta_hash = self.rc.accounts.hash_internal_state(self.slot());
extend_and_hash(&self.parent_hash, &serialize(&accounts_delta_hash).unwrap())
}
@ -1016,12 +1169,48 @@ impl Bank {
// Register a bogus executable account, which will be loaded and ignored.
self.register_native_instruction_processor("", &program_id);
}
pub fn compare_bank(&self, dbank: &Bank) {
assert_eq!(self.slot, dbank.slot);
assert_eq!(self.collector_id, dbank.collector_id);
assert_eq!(self.epoch_schedule, dbank.epoch_schedule);
assert_eq!(self.ticks_per_slot, dbank.ticks_per_slot);
assert_eq!(self.parent_hash, dbank.parent_hash);
assert_eq!(
self.tick_height.load(Ordering::SeqCst),
dbank.tick_height.load(Ordering::SeqCst)
);
assert_eq!(
self.is_delta.load(Ordering::SeqCst),
dbank.is_delta.load(Ordering::SeqCst)
);
let st = self.stakes.read().unwrap();
let dst = dbank.stakes.read().unwrap();
assert_eq!(*st, *dst);
let bh = self.hash.read().unwrap();
let dbh = dbank.hash.read().unwrap();
assert_eq!(*bh, *dbh);
let bhq = self.blockhash_queue.read().unwrap();
let dbhq = dbank.blockhash_queue.read().unwrap();
assert_eq!(*bhq, *dbhq);
let sc = self.src.status_cache.read().unwrap();
let dsc = dbank.src.status_cache.read().unwrap();
assert_eq!(*sc, *dsc);
assert_eq!(
self.rc.accounts.hash_internal_state(self.slot),
dbank.rc.accounts.hash_internal_state(dbank.slot)
);
}
}
impl Drop for Bank {
fn drop(&mut self) {
// For root forks this is a noop
self.accounts.purge_fork(self.slot());
self.rc.accounts.purge_fork(self.slot());
}
}
@ -1032,6 +1221,7 @@ mod tests {
use crate::genesis_utils::{
create_genesis_block_with_leader, GenesisBlockInfo, BOOTSTRAP_LEADER_LAMPORTS,
};
use bincode::{deserialize_from, serialize_into, serialized_size};
use solana_sdk::genesis_block::create_genesis_block;
use solana_sdk::hash;
use solana_sdk::instruction::InstructionError;
@ -1040,6 +1230,7 @@ mod tests {
use solana_sdk::system_transaction;
use solana_vote_api::vote_instruction;
use solana_vote_api::vote_state::VoteState;
use std::io::Cursor;
#[test]
fn test_bank_new() {
@ -1999,4 +2190,28 @@ mod tests {
assert!(bank.is_delta.load(Ordering::Relaxed));
}
#[test]
fn test_bank_serialize() {
let (genesis_block, _) = create_genesis_block(500);
let bank0 = Arc::new(Bank::new(&genesis_block));
let bank = new_from_parent(&bank0);
// Test new account
let key = Keypair::new();
bank.deposit(&key.pubkey(), 10);
assert_eq!(bank.get_balance(&key.pubkey()), 10);
let len = serialized_size(&bank).unwrap() + serialized_size(&bank.rc).unwrap();
let mut buf = vec![0u8; len as usize];
let mut writer = Cursor::new(&mut buf[..]);
serialize_into(&mut writer, &bank).unwrap();
serialize_into(&mut writer, &bank.rc).unwrap();
let mut reader = Cursor::new(&mut buf[..]);
let mut dbank: Bank = deserialize_from(&mut reader).unwrap();
let dbank_rc: BankRc = deserialize_from(&mut reader).unwrap();
dbank.rc = dbank_rc;
assert_eq!(dbank.get_balance(&key.pubkey()), 10);
bank.compare_bank(&dbank);
}
}

View File

@ -1,15 +1,16 @@
use hashbrown::HashMap;
use serde::{Deserialize, Serialize};
use solana_sdk::hash::Hash;
use solana_sdk::timing::timestamp;
use std::collections::HashMap;
#[derive(Debug, PartialEq, Eq, Clone)]
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
struct HashAge {
timestamp: u64,
hash_height: u64,
}
/// Low memory overhead, so can be cloned for every checkpoint
#[derive(Clone)]
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct BlockhashQueue {
/// updated whenever an hash is registered
hash_height: u64,

View File

@ -16,7 +16,8 @@ pub trait BloomHashIndex {
#[derive(Serialize, Deserialize, Default, Clone, Debug, PartialEq)]
pub struct Bloom<T: BloomHashIndex> {
pub keys: Vec<u64>,
pub bits: BitVec<u8>,
pub bits: BitVec<u64>,
num_bits_set: u64,
_phantom: PhantomData<T>,
}
@ -26,6 +27,7 @@ impl<T: BloomHashIndex> Bloom<T> {
Bloom {
keys,
bits,
num_bits_set: 0,
_phantom: PhantomData::default(),
}
}
@ -47,11 +49,15 @@ impl<T: BloomHashIndex> Bloom<T> {
}
pub fn clear(&mut self) {
self.bits = BitVec::new_fill(false, self.bits.len());
self.num_bits_set = 0;
}
pub fn add(&mut self, key: &T) {
for k in &self.keys {
let pos = self.pos(key, *k);
self.bits.set(pos, true);
if !self.bits.get(pos) {
self.num_bits_set += 1;
self.bits.set(pos, true);
}
}
}
pub fn contains(&self, key: &T) -> bool {

View File

@ -2,7 +2,7 @@ use solana_vote_api::vote_state::MAX_LOCKOUT_HISTORY;
pub const MINIMUM_SLOT_LENGTH: usize = MAX_LOCKOUT_HISTORY + 1;
#[derive(Default, Debug, PartialEq, Eq, Clone, Copy)]
#[derive(Default, Debug, PartialEq, Eq, Clone, Copy, Deserialize, Serialize)]
pub struct EpochSchedule {
/// The maximum number of slots in each epoch.
pub slots_per_epoch: u64,

View File

@ -12,8 +12,9 @@ pub mod loader_utils;
pub mod locked_accounts_results;
pub mod message_processor;
mod native_loader;
mod serde_utils;
pub mod stakes;
mod status_cache;
pub mod status_cache;
mod system_instruction_processor;
#[macro_use]

View File

@ -1,5 +1,6 @@
use crate::native_loader;
use crate::system_instruction_processor;
use serde::{Deserialize, Serialize};
use solana_sdk::account::{create_keyed_accounts, Account, KeyedAccount};
use solana_sdk::instruction::{CompiledInstruction, InstructionError};
use solana_sdk::instruction_processor_utils;
@ -85,8 +86,11 @@ pub type ProcessInstruction =
pub type SymbolCache = RwLock<HashMap<Vec<u8>, Symbol<instruction_processor_utils::Entrypoint>>>;
#[derive(Serialize, Deserialize)]
pub struct MessageProcessor {
#[serde(skip)]
instruction_processors: Vec<(Pubkey, ProcessInstruction)>,
#[serde(skip)]
symbol_cache: SymbolCache,
}

View File

@ -0,0 +1,62 @@
use std::fmt;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
struct U64Visitor;
impl<'a> serde::de::Visitor<'a> for U64Visitor {
type Value = u64;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("Expecting u64")
}
fn visit_u64<E>(self, data: u64) -> std::result::Result<u64, E>
where
E: serde::de::Error,
{
Ok(data)
}
}
pub fn deserialize_atomicusize<'de, D>(d: D) -> Result<AtomicUsize, D::Error>
where
D: serde::de::Deserializer<'de>,
{
let value = d.deserialize_u64(U64Visitor)?;
Ok(AtomicUsize::new(value as usize))
}
pub fn serialize_atomicusize<S>(x: &AtomicUsize, s: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
s.serialize_u64(x.load(Ordering::SeqCst) as u64)
}
struct BoolVisitor;
impl<'a> serde::de::Visitor<'a> for BoolVisitor {
type Value = bool;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("Expecting bool")
}
fn visit_bool<E>(self, data: bool) -> std::result::Result<bool, E>
where
E: serde::de::Error,
{
Ok(data)
}
}
pub fn deserialize_atomicbool<'de, D>(d: D) -> Result<AtomicBool, D::Error>
where
D: serde::de::Deserializer<'de>,
{
let value = d.deserialize_bool(BoolVisitor)?;
Ok(AtomicBool::new(value))
}
pub fn serialize_atomicbool<S>(x: &AtomicBool, s: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
s.serialize_bool(x.load(Ordering::SeqCst))
}

View File

@ -1,11 +1,11 @@
//! Stakes serve as a cache of stake and vote accounts to derive
//! node stakes
use hashbrown::HashMap;
use solana_sdk::account::Account;
use solana_sdk::pubkey::Pubkey;
use solana_stake_api::stake_state::StakeState;
use std::collections::HashMap;
#[derive(Default, Clone)]
#[derive(Default, Clone, PartialEq, Debug, Deserialize, Serialize)]
pub struct Stakes {
/// vote accounts
vote_accounts: HashMap<Pubkey, (u64, Account)>,

View File

@ -1,8 +1,10 @@
use hashbrown::{HashMap, HashSet};
use log::*;
use rand::{thread_rng, Rng};
use serde::ser::SerializeSeq;
use serde::{Deserialize, Serialize};
use solana_sdk::hash::Hash;
use solana_sdk::signature::Signature;
use std::collections::{HashMap, HashSet};
const MAX_CACHE_ENTRIES: usize = solana_sdk::timing::MAX_HASH_AGE_IN_SECONDS;
const CACHED_SIGNATURE_SIZE: usize = 20;
@ -14,22 +16,36 @@ type SignatureSlice = [u8; CACHED_SIGNATURE_SIZE];
type SignatureMap<T> = HashMap<SignatureSlice, ForkStatus<T>>;
type StatusMap<T> = HashMap<Hash, (ForkId, usize, SignatureMap<T>)>;
pub struct StatusCache<T: Clone> {
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct StatusCache<T: Serialize + Clone> {
/// all signatures seen during a hash period
cache: StatusMap<T>,
#[serde(serialize_with = "serialize_statusmap")]
cache: Vec<StatusMap<T>>,
roots: HashSet<ForkId>,
}
impl<T: Clone> Default for StatusCache<T> {
fn serialize_statusmap<S, T: Serialize>(x: &[StatusMap<T>], s: S) -> Result<S::Ok, S::Error>
where
T: serde::Serialize + Clone,
S: serde::Serializer,
{
let cache0: StatusMap<T> = HashMap::new();
let mut seq = s.serialize_seq(Some(x.len()))?;
seq.serialize_element(&cache0)?;
seq.serialize_element(&x[1])?;
seq.end()
}
impl<T: Serialize + Clone> Default for StatusCache<T> {
fn default() -> Self {
Self {
cache: HashMap::default(),
cache: vec![HashMap::default(); 2],
roots: HashSet::default(),
}
}
}
impl<T: Clone> StatusCache<T> {
impl<T: Serialize + Clone> StatusCache<T> {
/// Check if the signature from a transaction is in any of the forks in the ancestors set.
pub fn get_signature_status(
&self,
@ -37,15 +53,26 @@ impl<T: Clone> StatusCache<T> {
transaction_blockhash: &Hash,
ancestors: &HashMap<ForkId, usize>,
) -> Option<(ForkId, T)> {
let (_, index, sigmap) = self.cache.get(transaction_blockhash)?;
let mut sig_slice = [0u8; CACHED_SIGNATURE_SIZE];
sig_slice.clone_from_slice(&sig.as_ref()[*index..*index + CACHED_SIGNATURE_SIZE]);
let stored_forks = sigmap.get(&sig_slice)?;
stored_forks
.iter()
.filter(|(f, _)| ancestors.get(f).is_some() || self.roots.get(f).is_some())
.nth(0)
.cloned()
for cache in self.cache.iter() {
let map = cache.get(transaction_blockhash);
if map.is_none() {
continue;
}
let (_, index, sigmap) = map.unwrap();
let mut sig_slice = [0u8; CACHED_SIGNATURE_SIZE];
sig_slice.clone_from_slice(&sig.as_ref()[*index..*index + CACHED_SIGNATURE_SIZE]);
if let Some(stored_forks) = sigmap.get(&sig_slice) {
let res = stored_forks
.iter()
.filter(|(f, _)| ancestors.get(f).is_some() || self.roots.get(f).is_some())
.nth(0)
.cloned();
if res.is_some() {
return res;
}
}
}
None
}
/// TODO: wallets should send the Transactions recent blockhash as well
@ -55,7 +82,12 @@ impl<T: Clone> StatusCache<T> {
ancestors: &HashMap<ForkId, usize>,
) -> Option<(usize, T)> {
trace!("get_signature_status_slow");
for blockhash in self.cache.keys() {
let mut keys = vec![];
for cache in self.cache.iter() {
let mut val: Vec<_> = cache.iter().map(|(k, _)| *k).collect();
keys.append(&mut val);
}
for blockhash in keys.iter() {
trace!("get_signature_status_slow: trying {}", blockhash);
if let Some((forkid, res)) = self.get_signature_status(sig, blockhash, ancestors) {
trace!("get_signature_status_slow: got {}", forkid);
@ -75,31 +107,80 @@ impl<T: Clone> StatusCache<T> {
if self.roots.len() > MAX_CACHE_ENTRIES {
if let Some(min) = self.roots.iter().min().cloned() {
self.roots.remove(&min);
self.cache.retain(|_, (fork, _, _)| *fork > min);
for cache in self.cache.iter_mut() {
cache.retain(|_, (fork, _, _)| *fork > min);
}
}
}
}
/// Insert a new signature for a specific fork.
pub fn insert(&mut self, transaction_blockhash: &Hash, sig: &Signature, fork: ForkId, res: T) {
let index: usize =
thread_rng().gen_range(0, std::mem::size_of::<Hash>() - CACHED_SIGNATURE_SIZE);
let sig_map =
self.cache
.entry(*transaction_blockhash)
.or_insert((fork, index, HashMap::new()));
let sig_index: usize;
if let Some(sig_map) = self.cache[0].get(transaction_blockhash) {
sig_index = sig_map.1;
} else {
sig_index =
thread_rng().gen_range(0, std::mem::size_of::<Hash>() - CACHED_SIGNATURE_SIZE);
}
let sig_map = self.cache[1].entry(*transaction_blockhash).or_insert((
fork,
sig_index,
HashMap::new(),
));
sig_map.0 = std::cmp::max(fork, sig_map.0);
let index = sig_map.1;
let mut sig_slice = [0u8; CACHED_SIGNATURE_SIZE];
sig_slice.clone_from_slice(&sig.as_ref()[index..index + CACHED_SIGNATURE_SIZE]);
let sig_forks = sig_map.2.entry(sig_slice).or_insert(vec![]);
let sig_forks = sig_map.2.entry(sig_slice).or_insert_with(|| vec![]);
sig_forks.push((fork, res));
}
fn insert_entry(
&mut self,
transaction_blockhash: &Hash,
sig_slice: &[u8; CACHED_SIGNATURE_SIZE],
status: Vec<(ForkId, T)>,
index: usize,
) {
let fork = status
.iter()
.fold(0, |acc, (f, _)| if acc > *f { acc } else { *f });
let sig_map =
self.cache[0]
.entry(*transaction_blockhash)
.or_insert((fork, index, HashMap::new()));
sig_map.0 = std::cmp::max(fork, sig_map.0);
let sig_forks = sig_map.2.entry(*sig_slice).or_insert_with(|| vec![]);
sig_forks.extend(status);
}
/// Clear for testing
pub fn clear_signatures(&mut self) {
for v in self.cache.values_mut() {
v.2 = HashMap::new();
for cache in self.cache.iter_mut() {
for v in cache.values_mut() {
v.2 = HashMap::new();
}
}
}
pub fn append(&mut self, status_cache: &StatusCache<T>) {
for (hash, sigmap) in status_cache.cache[1].iter() {
for (signature, fork_status) in sigmap.2.iter() {
self.insert_entry(hash, signature, fork_status.clone(), sigmap.1);
}
}
self.roots = self.roots.union(&status_cache.roots).cloned().collect();
}
pub fn merge_caches(&mut self) {
let mut cache = HashMap::new();
std::mem::swap(&mut cache, &mut self.cache[1]);
for (hash, sigmap) in cache.iter() {
for (signature, fork_status) in sigmap.2.iter() {
self.insert_entry(hash, signature, fork_status.clone(), sigmap.1);
}
}
}
}
@ -107,7 +188,9 @@ impl<T: Clone> StatusCache<T> {
#[cfg(test)]
mod tests {
use super::*;
use bincode::{deserialize_from, serialize_into, serialized_size};
use solana_sdk::hash::hash;
use std::io::Cursor;
type BankStatusCache = StatusCache<()>;
@ -260,9 +343,91 @@ mod tests {
let blockhash = hash(Hash::default().as_ref());
status_cache.clear_signatures();
status_cache.insert(&blockhash, &sig, 0, ());
let (_, index, sig_map) = status_cache.cache.get(&blockhash).unwrap();
let (_, index, sig_map) = status_cache.cache[1].get(&blockhash).unwrap();
let mut sig_slice = [0u8; CACHED_SIGNATURE_SIZE];
sig_slice.clone_from_slice(&sig.as_ref()[*index..*index + CACHED_SIGNATURE_SIZE]);
assert!(sig_map.get(&sig_slice).is_some());
}
#[test]
fn test_statuscache_append() {
let sig = Signature::default();
let mut status_cache0 = BankStatusCache::default();
let blockhash0 = hash(Hash::new(&vec![0; 32]).as_ref());
status_cache0.add_root(0);
status_cache0.insert(&blockhash0, &sig, 0, ());
let sig = Signature::default();
let mut status_cache1 = BankStatusCache::default();
let blockhash1 = hash(Hash::new(&vec![1; 32]).as_ref());
status_cache1.insert(&blockhash0, &sig, 1, ());
status_cache1.add_root(1);
status_cache1.insert(&blockhash1, &sig, 1, ());
status_cache0.append(&status_cache1);
let roots: HashSet<_> = [0, 1].iter().cloned().collect();
assert_eq!(status_cache0.roots, roots);
let ancestors = vec![(0, 1), (1, 1)].into_iter().collect();
assert!(status_cache0
.get_signature_status(&sig, &blockhash0, &ancestors)
.is_some());
assert!(status_cache0
.get_signature_status(&sig, &blockhash1, &ancestors)
.is_some());
}
fn test_serialize(sc: &mut BankStatusCache, blockhash: Vec<Hash>, sig: &Signature) {
let len = serialized_size(&sc).unwrap();
let mut buf = vec![0u8; len as usize];
let mut writer = Cursor::new(&mut buf[..]);
let cache0 = sc.cache[0].clone();
serialize_into(&mut writer, sc).unwrap();
for hash in blockhash.iter() {
if let Some(map0) = sc.cache[0].get(hash) {
if let Some(map1) = sc.cache[1].get(hash) {
assert_eq!(map0.1, map1.1);
}
}
}
sc.merge_caches();
let len = writer.position() as usize;
let mut reader = Cursor::new(&mut buf[..len]);
let mut status_cache: BankStatusCache = deserialize_from(&mut reader).unwrap();
status_cache.cache[0] = cache0;
status_cache.merge_caches();
assert!(status_cache.cache[0].len() > 0);
assert!(status_cache.cache[1].is_empty());
let ancestors = vec![(0, 1), (1, 1)].into_iter().collect();
assert_eq!(*sc, status_cache);
for hash in blockhash.iter() {
assert!(status_cache
.get_signature_status(&sig, &hash, &ancestors)
.is_some());
}
}
#[test]
fn test_statuscache_serialize() {
let sig = Signature::default();
let mut status_cache = BankStatusCache::default();
let blockhash0 = hash(Hash::new(&vec![0; 32]).as_ref());
status_cache.add_root(0);
status_cache.clear_signatures();
status_cache.insert(&blockhash0, &sig, 0, ());
test_serialize(&mut status_cache, vec![blockhash0], &sig);
status_cache.insert(&blockhash0, &sig, 1, ());
test_serialize(&mut status_cache, vec![blockhash0], &sig);
let blockhash1 = hash(Hash::new(&vec![1; 32]).as_ref());
status_cache.insert(&blockhash1, &sig, 1, ());
test_serialize(&mut status_cache, vec![blockhash0, blockhash1], &sig);
let blockhash2 = hash(Hash::new(&vec![2; 32]).as_ref());
let ancestors = vec![(0, 1), (1, 1)].into_iter().collect();
assert!(status_cache
.get_signature_status(&sig, &blockhash2, &ancestors)
.is_none());
}
}

View File

@ -156,7 +156,14 @@ fn main() {
.validator(port_range_validator)
.help("Range to use for dynamically assigned ports"),
)
.get_matches();
.arg(
clap::Arg::with_name("snapshot_path")
.long("snapshot-path")
.value_name("PATHS")
.takes_value(true)
.help("Snapshot path"),
)
.get_matches();
let mut validator_config = ValidatorConfig::default();
let keypair = if let Some(identity) = matches.value_of("identity") {
@ -220,6 +227,11 @@ fn main() {
} else {
validator_config.account_paths = None;
}
if let Some(paths) = matches.value_of("snapshot_path") {
validator_config.snapshot_path = Some(paths.to_string());
} else {
validator_config.snapshot_path = None;
}
let cluster_entrypoint = matches.value_of("entrypoint").map(|entrypoint| {
let entrypoint_addr = solana_netutil::parse_host_port(entrypoint)
.expect("failed to parse entrypoint address");