Move tower storage into its own module

This commit is contained in:
Michael Vines 2021-08-09 11:32:48 -07:00
parent d7ab510229
commit e9722474eb
13 changed files with 152 additions and 138 deletions

View File

@ -5,8 +5,7 @@ extern crate test;
use {
solana_core::{
consensus::{FileTowerStorage, Tower},
vote_simulator::VoteSimulator,
consensus::Tower, tower_storage::FileTowerStorage, vote_simulator::VoteSimulator,
},
solana_runtime::bank::Bank,
solana_runtime::bank_forks::BankForks,

View File

@ -3,6 +3,7 @@ use {
heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice,
latest_validator_votes_for_frozen_banks::LatestValidatorVotesForFrozenBanks,
progress_map::{LockoutIntervals, ProgressMap},
tower_storage::{SavedTower, TowerStorage},
},
chrono::prelude::*,
solana_ledger::{ancestor_iterator::AncestorIterator, blockstore::Blockstore, blockstore_db},
@ -15,7 +16,7 @@ use {
hash::Hash,
instruction::Instruction,
pubkey::Pubkey,
signature::{Keypair, Signature, Signer},
signature::Keypair,
slot_history::{Check, SlotHistory},
},
solana_vote_program::{
@ -25,13 +26,10 @@ use {
std::{
cmp::Ordering,
collections::{HashMap, HashSet},
fs::{self, File},
io::BufReader,
ops::{
Bound::{Included, Unbounded},
Deref,
},
path::PathBuf,
},
thiserror::Error,
};
@ -107,7 +105,7 @@ pub(crate) struct ComputedBankState {
#[frozen_abi(digest = "GMs1FxKteU7K4ZFRofMBqNhBpM4xkPVxfYod6R8DQmpT")]
#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, AbiExample)]
pub struct Tower {
node_pubkey: Pubkey,
pub(crate) node_pubkey: Pubkey,
threshold_depth: usize,
threshold_size: f64,
vote_state: VoteState,
@ -1225,122 +1223,6 @@ impl TowerError {
}
}
pub trait TowerStorage: Sync + Send {
fn load(&self, node_pubkey: &Pubkey) -> Result<SavedTower>;
fn store(&self, saved_tower: &SavedTower) -> Result<()>;
}
#[derive(Debug, Default, Clone, PartialEq)]
pub struct NullTowerStorage {}
impl TowerStorage for NullTowerStorage {
fn load(&self, _node_pubkey: &Pubkey) -> Result<SavedTower> {
Err(TowerError::WrongTower(
"NullTowerStorage has no storage".into(),
))
}
fn store(&self, _saved_tower: &SavedTower) -> Result<()> {
Ok(())
}
}
#[derive(Debug, Default, Clone, PartialEq)]
pub struct FileTowerStorage {
pub tower_path: PathBuf,
}
impl FileTowerStorage {
pub fn new(tower_path: PathBuf) -> Self {
Self { tower_path }
}
pub fn filename(&self, node_pubkey: &Pubkey) -> PathBuf {
self.tower_path
.join(format!("tower-{}", node_pubkey))
.with_extension("bin")
}
}
impl TowerStorage for FileTowerStorage {
fn load(&self, node_pubkey: &Pubkey) -> Result<SavedTower> {
let filename = self.filename(node_pubkey);
trace!("load {}", filename.display());
// Ensure to create parent dir here, because restore() precedes save() always
fs::create_dir_all(&filename.parent().unwrap())?;
let file = File::open(&filename)?;
let mut stream = BufReader::new(file);
bincode::deserialize_from(&mut stream).map_err(|e| e.into())
}
fn store(&self, saved_tower: &SavedTower) -> Result<()> {
let filename = self.filename(&saved_tower.node_pubkey);
trace!("store: {}", filename.display());
let new_filename = filename.with_extension("bin.new");
{
// overwrite anything if exists
let mut file = File::create(&new_filename)?;
bincode::serialize_into(&mut file, saved_tower)?;
// file.sync_all() hurts performance; pipeline sync-ing and submitting votes to the cluster!
}
fs::rename(&new_filename, &filename)?;
// self.path.parent().sync_all() hurts performance same as the above sync
Ok(())
}
}
#[frozen_abi(digest = "Gaxfwvx5MArn52mKZQgzHmDCyn5YfCuTHvp5Et3rFfpp")]
#[derive(Default, Clone, Serialize, Deserialize, Debug, PartialEq, AbiExample)]
pub struct SavedTower {
signature: Signature,
data: Vec<u8>,
#[serde(skip)]
node_pubkey: Pubkey,
}
impl SavedTower {
pub fn new<T: Signer>(tower: &Tower, keypair: &T) -> Result<Self> {
let node_pubkey = keypair.pubkey();
if tower.node_pubkey != node_pubkey {
return Err(TowerError::WrongTower(format!(
"node_pubkey is {:?} but found tower for {:?}",
node_pubkey, tower.node_pubkey
)));
}
let data = bincode::serialize(tower)?;
let signature = keypair.sign_message(&data);
Ok(Self {
signature,
data,
node_pubkey,
})
}
pub fn try_into_tower(self, node_pubkey: &Pubkey) -> Result<Tower> {
// This method assumes that `self` was just deserialized
assert_eq!(self.node_pubkey, Pubkey::default());
if !self.signature.verify(node_pubkey.as_ref(), &self.data) {
return Err(TowerError::InvalidSignature);
}
bincode::deserialize(&self.data)
.map_err(|e| e.into())
.and_then(|tower: Tower| {
if tower.node_pubkey != *node_pubkey {
return Err(TowerError::WrongTower(format!(
"node_pubkey is {:?} but found tower for {:?}",
node_pubkey, tower.node_pubkey
)));
}
Ok(tower)
})
}
}
// Given an untimely crash, tower may have roots that are not reflected in blockstore,
// or the reverse of this.
// That's because we don't impose any ordering guarantee or any kind of write barriers
@ -1391,7 +1273,8 @@ pub mod test {
use super::*;
use crate::{
fork_choice::ForkChoice, heaviest_subtree_fork_choice::SlotHashKey,
replay_stage::HeaviestForkFailures, vote_simulator::VoteSimulator,
replay_stage::HeaviestForkFailures, tower_storage::FileTowerStorage,
vote_simulator::VoteSimulator,
};
use solana_ledger::{blockstore::make_slot_entries, get_tmp_ledger_path};
use solana_runtime::bank::Bank;
@ -1408,6 +1291,7 @@ pub mod test {
collections::HashMap,
fs::{remove_file, OpenOptions},
io::{Read, Seek, SeekFrom, Write},
path::PathBuf,
sync::Arc,
};
use tempfile::TempDir;

View File

@ -53,6 +53,7 @@ pub mod sigverify_shreds;
pub mod sigverify_stage;
pub mod snapshot_packager_service;
pub mod test_validator;
pub mod tower_storage;
pub mod tpu;
pub mod tree_diff;
pub mod tvu;

View File

@ -12,8 +12,7 @@ use {
cluster_slots_service::ClusterSlotsUpdateSender,
commitment_service::{AggregateCommitmentService, CommitmentAggregationData},
consensus::{
ComputedBankState, SavedTower, Stake, SwitchForkDecision, Tower, TowerStorage,
VotedStakes, SWITCH_FORK_THRESHOLD,
ComputedBankState, Stake, SwitchForkDecision, Tower, VotedStakes, SWITCH_FORK_THRESHOLD,
},
fork_choice::{ForkChoice, SelectVoteAndResetForkResult},
heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice,
@ -21,6 +20,7 @@ use {
progress_map::{ForkProgress, ProgressMap, PropagatedStats},
repair_service::DuplicateSlotsResetReceiver,
rewards_recorder_service::RewardsRecorderSender,
tower_storage::{SavedTower, TowerStorage},
unfrozen_gossip_verified_vote_hashes::UnfrozenGossipVerifiedVoteHashes,
voting_service::VoteOp,
window_service::DuplicateSlotReceiver,
@ -2738,7 +2738,7 @@ impl ReplayStage {
pub mod tests {
use super::*;
use crate::{
consensus::{NullTowerStorage, Tower},
consensus::Tower,
progress_map::ValidatorStakeInfo,
replay_stage::ReplayStage,
tree_diff::TreeDiff,
@ -5419,7 +5419,7 @@ pub mod tests {
vote_simulator,
..
} = replay_blockstore_components(None, 10, None::<GenerateVotes>);
let tower_storage = NullTowerStorage::default();
let tower_storage = crate::tower_storage::NullTowerStorage::default();
let VoteSimulator {
mut validator_keypairs,

128
core/src/tower_storage.rs Normal file
View File

@ -0,0 +1,128 @@
use {
crate::consensus::{Result, Tower, TowerError},
solana_sdk::{
pubkey::Pubkey,
signature::{Signature, Signer},
},
std::{
fs::{self, File},
io::BufReader,
path::PathBuf,
},
};
#[frozen_abi(digest = "Gaxfwvx5MArn52mKZQgzHmDCyn5YfCuTHvp5Et3rFfpp")]
#[derive(Default, Clone, Serialize, Deserialize, Debug, PartialEq, AbiExample)]
pub struct SavedTower {
signature: Signature,
data: Vec<u8>,
#[serde(skip)]
node_pubkey: Pubkey,
}
impl SavedTower {
pub fn new<T: Signer>(tower: &Tower, keypair: &T) -> Result<Self> {
let node_pubkey = keypair.pubkey();
if tower.node_pubkey != node_pubkey {
return Err(TowerError::WrongTower(format!(
"node_pubkey is {:?} but found tower for {:?}",
node_pubkey, tower.node_pubkey
)));
}
let data = bincode::serialize(tower)?;
let signature = keypair.sign_message(&data);
Ok(Self {
signature,
data,
node_pubkey,
})
}
pub fn try_into_tower(self, node_pubkey: &Pubkey) -> Result<Tower> {
// This method assumes that `self` was just deserialized
assert_eq!(self.node_pubkey, Pubkey::default());
if !self.signature.verify(node_pubkey.as_ref(), &self.data) {
return Err(TowerError::InvalidSignature);
}
bincode::deserialize(&self.data)
.map_err(|e| e.into())
.and_then(|tower: Tower| {
if tower.node_pubkey != *node_pubkey {
return Err(TowerError::WrongTower(format!(
"node_pubkey is {:?} but found tower for {:?}",
node_pubkey, tower.node_pubkey
)));
}
Ok(tower)
})
}
}
pub trait TowerStorage: Sync + Send {
fn load(&self, node_pubkey: &Pubkey) -> Result<SavedTower>;
fn store(&self, saved_tower: &SavedTower) -> Result<()>;
}
#[derive(Debug, Default, Clone, PartialEq)]
pub struct NullTowerStorage {}
impl TowerStorage for NullTowerStorage {
fn load(&self, _node_pubkey: &Pubkey) -> Result<SavedTower> {
Err(TowerError::WrongTower(
"NullTowerStorage has no storage".into(),
))
}
fn store(&self, _saved_tower: &SavedTower) -> Result<()> {
Ok(())
}
}
#[derive(Debug, Default, Clone, PartialEq)]
pub struct FileTowerStorage {
pub tower_path: PathBuf,
}
impl FileTowerStorage {
pub fn new(tower_path: PathBuf) -> Self {
Self { tower_path }
}
pub fn filename(&self, node_pubkey: &Pubkey) -> PathBuf {
self.tower_path
.join(format!("tower-{}", node_pubkey))
.with_extension("bin")
}
}
impl TowerStorage for FileTowerStorage {
fn load(&self, node_pubkey: &Pubkey) -> Result<SavedTower> {
let filename = self.filename(node_pubkey);
trace!("load {}", filename.display());
// Ensure to create parent dir here, because restore() precedes save() always
fs::create_dir_all(&filename.parent().unwrap())?;
let file = File::open(&filename)?;
let mut stream = BufReader::new(file);
bincode::deserialize_from(&mut stream).map_err(|e| e.into())
}
fn store(&self, saved_tower: &SavedTower) -> Result<()> {
let filename = self.filename(&saved_tower.node_pubkey);
trace!("store: {}", filename.display());
let new_filename = filename.with_extension("bin.new");
{
// overwrite anything if exists
let mut file = File::create(&new_filename)?;
bincode::serialize_into(&mut file, saved_tower)?;
// file.sync_all() hurts performance; pipeline sync-ing and submitting votes to the cluster!
}
fs::rename(&new_filename, &filename)?;
// self.path.parent().sync_all() hurts performance same as the above sync
Ok(())
}
}

View File

@ -11,7 +11,7 @@ use crate::{
},
cluster_slots::ClusterSlots,
completed_data_sets_service::CompletedDataSetsSender,
consensus::{Tower, TowerStorage},
consensus::Tower,
cost_model::CostModel,
cost_update_service::CostUpdateService,
ledger_cleanup_service::LedgerCleanupService,
@ -22,6 +22,7 @@ use crate::{
sigverify_shreds::ShredSigVerifier,
sigverify_stage::SigVerifyStage,
snapshot_packager_service::PendingSnapshotPackage,
tower_storage::TowerStorage,
voting_service::VotingService,
};
use crossbeam_channel::unbounded;
@ -455,7 +456,7 @@ pub mod tests {
)),
&poh_recorder,
tower,
Arc::new(crate::consensus::FileTowerStorage::default()),
Arc::new(crate::tower_storage::FileTowerStorage::default()),
&leader_schedule_cache,
&exit,
block_commitment_cache,

View File

@ -5,7 +5,7 @@ use crate::{
cache_block_meta_service::{CacheBlockMetaSender, CacheBlockMetaService},
cluster_info_vote_listener::VoteTracker,
completed_data_sets_service::CompletedDataSetsService,
consensus::{reconcile_blockstore_roots_with_tower, FileTowerStorage, Tower, TowerStorage},
consensus::{reconcile_blockstore_roots_with_tower, Tower},
cost_model::CostModel,
rewards_recorder_service::{RewardsRecorderSender, RewardsRecorderService},
sample_performance_service::SamplePerformanceService,
@ -13,6 +13,7 @@ use crate::{
serve_repair_service::ServeRepairService,
sigverify,
snapshot_packager_service::{PendingSnapshotPackage, SnapshotPackagerService},
tower_storage::{FileTowerStorage, TowerStorage},
tpu::{Tpu, DEFAULT_TPU_COALESCE_MS},
tvu::{Sockets, Tvu, TvuConfig},
};

View File

@ -1,4 +1,4 @@
use crate::consensus::{SavedTower, TowerStorage};
use crate::tower_storage::{SavedTower, TowerStorage};
use solana_gossip::cluster_info::ClusterInfo;
use solana_measure::measure::Measure;
use solana_poh::poh_recorder::PohRecorder;

View File

@ -8,7 +8,7 @@ use {
log::*,
solana_client::thin_client::{create_client, ThinClient},
solana_core::{
consensus::FileTowerStorage,
tower_storage::FileTowerStorage,
validator::{Validator, ValidatorConfig, ValidatorStartProgress},
},
solana_gossip::{

View File

@ -16,9 +16,10 @@ use {
broadcast_stage::{
broadcast_duplicates_run::BroadcastDuplicatesConfig, BroadcastStageType,
},
consensus::{FileTowerStorage, Tower, SWITCH_FORK_THRESHOLD, VOTE_THRESHOLD_DEPTH},
consensus::{Tower, SWITCH_FORK_THRESHOLD, VOTE_THRESHOLD_DEPTH},
optimistic_confirmation_verifier::OptimisticConfirmationVerifier,
replay_stage::DUPLICATE_THRESHOLD,
tower_storage::FileTowerStorage,
validator::ValidatorConfig,
},
solana_download_utils::download_snapshot,

View File

@ -6,8 +6,7 @@ use {
jsonrpc_server_utils::tokio,
log::*,
solana_core::{
consensus::{Tower, TowerStorage},
validator::ValidatorStartProgress,
consensus::Tower, tower_storage::TowerStorage, validator::ValidatorStartProgress,
},
solana_gossip::cluster_info::ClusterInfo,
solana_sdk::{

View File

@ -8,7 +8,7 @@ use {
},
},
solana_client::rpc_client::RpcClient,
solana_core::consensus::FileTowerStorage,
solana_core::tower_storage::FileTowerStorage,
solana_faucet::faucet::{run_local_faucet_with_port, FAUCET_PORT},
solana_rpc::rpc::JsonRpcConfig,
solana_sdk::{

View File

@ -20,8 +20,8 @@ use {
rpc_request::MAX_MULTIPLE_ACCOUNTS,
},
solana_core::{
consensus::FileTowerStorage,
ledger_cleanup_service::{DEFAULT_MAX_LEDGER_SHREDS, DEFAULT_MIN_MAX_LEDGER_SHREDS},
tower_storage::FileTowerStorage,
tpu::DEFAULT_TPU_COALESCE_MS,
validator::{
is_snapshot_config_invalid, Validator, ValidatorConfig, ValidatorStartProgress,