From e9722474eb0a79be9e8223f0819c0d6a070de9a2 Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Mon, 9 Aug 2021 11:32:48 -0700 Subject: [PATCH] Move tower storage into its own module --- core/benches/consensus.rs | 3 +- core/src/consensus.rs | 128 +-------------------- core/src/lib.rs | 1 + core/src/replay_stage.rs | 8 +- core/src/tower_storage.rs | 128 +++++++++++++++++++++ core/src/tvu.rs | 5 +- core/src/validator.rs | 3 +- core/src/voting_service.rs | 2 +- local-cluster/src/local_cluster.rs | 2 +- local-cluster/tests/local_cluster.rs | 3 +- validator/src/admin_rpc_service.rs | 3 +- validator/src/bin/solana-test-validator.rs | 2 +- validator/src/main.rs | 2 +- 13 files changed, 152 insertions(+), 138 deletions(-) create mode 100644 core/src/tower_storage.rs diff --git a/core/benches/consensus.rs b/core/benches/consensus.rs index abfed7adf0..b72b955bc0 100644 --- a/core/benches/consensus.rs +++ b/core/benches/consensus.rs @@ -5,8 +5,7 @@ extern crate test; use { solana_core::{ - consensus::{FileTowerStorage, Tower}, - vote_simulator::VoteSimulator, + consensus::Tower, tower_storage::FileTowerStorage, vote_simulator::VoteSimulator, }, solana_runtime::bank::Bank, solana_runtime::bank_forks::BankForks, diff --git a/core/src/consensus.rs b/core/src/consensus.rs index 5a0f4b2eda..000d58f5e6 100644 --- a/core/src/consensus.rs +++ b/core/src/consensus.rs @@ -3,6 +3,7 @@ use { heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice, latest_validator_votes_for_frozen_banks::LatestValidatorVotesForFrozenBanks, progress_map::{LockoutIntervals, ProgressMap}, + tower_storage::{SavedTower, TowerStorage}, }, chrono::prelude::*, solana_ledger::{ancestor_iterator::AncestorIterator, blockstore::Blockstore, blockstore_db}, @@ -15,7 +16,7 @@ use { hash::Hash, instruction::Instruction, pubkey::Pubkey, - signature::{Keypair, Signature, Signer}, + signature::Keypair, slot_history::{Check, SlotHistory}, }, solana_vote_program::{ @@ -25,13 +26,10 @@ use { std::{ cmp::Ordering, collections::{HashMap, HashSet}, - fs::{self, File}, - io::BufReader, ops::{ Bound::{Included, Unbounded}, Deref, }, - path::PathBuf, }, thiserror::Error, }; @@ -107,7 +105,7 @@ pub(crate) struct ComputedBankState { #[frozen_abi(digest = "GMs1FxKteU7K4ZFRofMBqNhBpM4xkPVxfYod6R8DQmpT")] #[derive(Clone, Serialize, Deserialize, Debug, PartialEq, AbiExample)] pub struct Tower { - node_pubkey: Pubkey, + pub(crate) node_pubkey: Pubkey, threshold_depth: usize, threshold_size: f64, vote_state: VoteState, @@ -1225,122 +1223,6 @@ impl TowerError { } } -pub trait TowerStorage: Sync + Send { - fn load(&self, node_pubkey: &Pubkey) -> Result; - fn store(&self, saved_tower: &SavedTower) -> Result<()>; -} - -#[derive(Debug, Default, Clone, PartialEq)] -pub struct NullTowerStorage {} - -impl TowerStorage for NullTowerStorage { - fn load(&self, _node_pubkey: &Pubkey) -> Result { - Err(TowerError::WrongTower( - "NullTowerStorage has no storage".into(), - )) - } - - fn store(&self, _saved_tower: &SavedTower) -> Result<()> { - Ok(()) - } -} - -#[derive(Debug, Default, Clone, PartialEq)] -pub struct FileTowerStorage { - pub tower_path: PathBuf, -} - -impl FileTowerStorage { - pub fn new(tower_path: PathBuf) -> Self { - Self { tower_path } - } - - pub fn filename(&self, node_pubkey: &Pubkey) -> PathBuf { - self.tower_path - .join(format!("tower-{}", node_pubkey)) - .with_extension("bin") - } -} - -impl TowerStorage for FileTowerStorage { - fn load(&self, node_pubkey: &Pubkey) -> Result { - let filename = self.filename(node_pubkey); - trace!("load {}", filename.display()); - - // Ensure to create parent dir here, because restore() precedes save() always - fs::create_dir_all(&filename.parent().unwrap())?; - - let file = File::open(&filename)?; - let mut stream = BufReader::new(file); - bincode::deserialize_from(&mut stream).map_err(|e| e.into()) - } - - fn store(&self, saved_tower: &SavedTower) -> Result<()> { - let filename = self.filename(&saved_tower.node_pubkey); - trace!("store: {}", filename.display()); - let new_filename = filename.with_extension("bin.new"); - - { - // overwrite anything if exists - let mut file = File::create(&new_filename)?; - bincode::serialize_into(&mut file, saved_tower)?; - // file.sync_all() hurts performance; pipeline sync-ing and submitting votes to the cluster! - } - fs::rename(&new_filename, &filename)?; - // self.path.parent().sync_all() hurts performance same as the above sync - Ok(()) - } -} - -#[frozen_abi(digest = "Gaxfwvx5MArn52mKZQgzHmDCyn5YfCuTHvp5Et3rFfpp")] -#[derive(Default, Clone, Serialize, Deserialize, Debug, PartialEq, AbiExample)] -pub struct SavedTower { - signature: Signature, - data: Vec, - #[serde(skip)] - node_pubkey: Pubkey, -} - -impl SavedTower { - pub fn new(tower: &Tower, keypair: &T) -> Result { - let node_pubkey = keypair.pubkey(); - if tower.node_pubkey != node_pubkey { - return Err(TowerError::WrongTower(format!( - "node_pubkey is {:?} but found tower for {:?}", - node_pubkey, tower.node_pubkey - ))); - } - - let data = bincode::serialize(tower)?; - let signature = keypair.sign_message(&data); - Ok(Self { - signature, - data, - node_pubkey, - }) - } - - pub fn try_into_tower(self, node_pubkey: &Pubkey) -> Result { - // This method assumes that `self` was just deserialized - assert_eq!(self.node_pubkey, Pubkey::default()); - - if !self.signature.verify(node_pubkey.as_ref(), &self.data) { - return Err(TowerError::InvalidSignature); - } - bincode::deserialize(&self.data) - .map_err(|e| e.into()) - .and_then(|tower: Tower| { - if tower.node_pubkey != *node_pubkey { - return Err(TowerError::WrongTower(format!( - "node_pubkey is {:?} but found tower for {:?}", - node_pubkey, tower.node_pubkey - ))); - } - Ok(tower) - }) - } -} - // Given an untimely crash, tower may have roots that are not reflected in blockstore, // or the reverse of this. // That's because we don't impose any ordering guarantee or any kind of write barriers @@ -1391,7 +1273,8 @@ pub mod test { use super::*; use crate::{ fork_choice::ForkChoice, heaviest_subtree_fork_choice::SlotHashKey, - replay_stage::HeaviestForkFailures, vote_simulator::VoteSimulator, + replay_stage::HeaviestForkFailures, tower_storage::FileTowerStorage, + vote_simulator::VoteSimulator, }; use solana_ledger::{blockstore::make_slot_entries, get_tmp_ledger_path}; use solana_runtime::bank::Bank; @@ -1408,6 +1291,7 @@ pub mod test { collections::HashMap, fs::{remove_file, OpenOptions}, io::{Read, Seek, SeekFrom, Write}, + path::PathBuf, sync::Arc, }; use tempfile::TempDir; diff --git a/core/src/lib.rs b/core/src/lib.rs index db336c73a1..8bf8e88291 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -53,6 +53,7 @@ pub mod sigverify_shreds; pub mod sigverify_stage; pub mod snapshot_packager_service; pub mod test_validator; +pub mod tower_storage; pub mod tpu; pub mod tree_diff; pub mod tvu; diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 9a4e682914..06c2a166c3 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -12,8 +12,7 @@ use { cluster_slots_service::ClusterSlotsUpdateSender, commitment_service::{AggregateCommitmentService, CommitmentAggregationData}, consensus::{ - ComputedBankState, SavedTower, Stake, SwitchForkDecision, Tower, TowerStorage, - VotedStakes, SWITCH_FORK_THRESHOLD, + ComputedBankState, Stake, SwitchForkDecision, Tower, VotedStakes, SWITCH_FORK_THRESHOLD, }, fork_choice::{ForkChoice, SelectVoteAndResetForkResult}, heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice, @@ -21,6 +20,7 @@ use { progress_map::{ForkProgress, ProgressMap, PropagatedStats}, repair_service::DuplicateSlotsResetReceiver, rewards_recorder_service::RewardsRecorderSender, + tower_storage::{SavedTower, TowerStorage}, unfrozen_gossip_verified_vote_hashes::UnfrozenGossipVerifiedVoteHashes, voting_service::VoteOp, window_service::DuplicateSlotReceiver, @@ -2738,7 +2738,7 @@ impl ReplayStage { pub mod tests { use super::*; use crate::{ - consensus::{NullTowerStorage, Tower}, + consensus::Tower, progress_map::ValidatorStakeInfo, replay_stage::ReplayStage, tree_diff::TreeDiff, @@ -5419,7 +5419,7 @@ pub mod tests { vote_simulator, .. } = replay_blockstore_components(None, 10, None::); - let tower_storage = NullTowerStorage::default(); + let tower_storage = crate::tower_storage::NullTowerStorage::default(); let VoteSimulator { mut validator_keypairs, diff --git a/core/src/tower_storage.rs b/core/src/tower_storage.rs new file mode 100644 index 0000000000..dedfacc6db --- /dev/null +++ b/core/src/tower_storage.rs @@ -0,0 +1,128 @@ +use { + crate::consensus::{Result, Tower, TowerError}, + solana_sdk::{ + pubkey::Pubkey, + signature::{Signature, Signer}, + }, + std::{ + fs::{self, File}, + io::BufReader, + path::PathBuf, + }, +}; + +#[frozen_abi(digest = "Gaxfwvx5MArn52mKZQgzHmDCyn5YfCuTHvp5Et3rFfpp")] +#[derive(Default, Clone, Serialize, Deserialize, Debug, PartialEq, AbiExample)] +pub struct SavedTower { + signature: Signature, + data: Vec, + #[serde(skip)] + node_pubkey: Pubkey, +} + +impl SavedTower { + pub fn new(tower: &Tower, keypair: &T) -> Result { + let node_pubkey = keypair.pubkey(); + if tower.node_pubkey != node_pubkey { + return Err(TowerError::WrongTower(format!( + "node_pubkey is {:?} but found tower for {:?}", + node_pubkey, tower.node_pubkey + ))); + } + + let data = bincode::serialize(tower)?; + let signature = keypair.sign_message(&data); + Ok(Self { + signature, + data, + node_pubkey, + }) + } + + pub fn try_into_tower(self, node_pubkey: &Pubkey) -> Result { + // This method assumes that `self` was just deserialized + assert_eq!(self.node_pubkey, Pubkey::default()); + + if !self.signature.verify(node_pubkey.as_ref(), &self.data) { + return Err(TowerError::InvalidSignature); + } + bincode::deserialize(&self.data) + .map_err(|e| e.into()) + .and_then(|tower: Tower| { + if tower.node_pubkey != *node_pubkey { + return Err(TowerError::WrongTower(format!( + "node_pubkey is {:?} but found tower for {:?}", + node_pubkey, tower.node_pubkey + ))); + } + Ok(tower) + }) + } +} + +pub trait TowerStorage: Sync + Send { + fn load(&self, node_pubkey: &Pubkey) -> Result; + fn store(&self, saved_tower: &SavedTower) -> Result<()>; +} + +#[derive(Debug, Default, Clone, PartialEq)] +pub struct NullTowerStorage {} + +impl TowerStorage for NullTowerStorage { + fn load(&self, _node_pubkey: &Pubkey) -> Result { + Err(TowerError::WrongTower( + "NullTowerStorage has no storage".into(), + )) + } + + fn store(&self, _saved_tower: &SavedTower) -> Result<()> { + Ok(()) + } +} + +#[derive(Debug, Default, Clone, PartialEq)] +pub struct FileTowerStorage { + pub tower_path: PathBuf, +} + +impl FileTowerStorage { + pub fn new(tower_path: PathBuf) -> Self { + Self { tower_path } + } + + pub fn filename(&self, node_pubkey: &Pubkey) -> PathBuf { + self.tower_path + .join(format!("tower-{}", node_pubkey)) + .with_extension("bin") + } +} + +impl TowerStorage for FileTowerStorage { + fn load(&self, node_pubkey: &Pubkey) -> Result { + let filename = self.filename(node_pubkey); + trace!("load {}", filename.display()); + + // Ensure to create parent dir here, because restore() precedes save() always + fs::create_dir_all(&filename.parent().unwrap())?; + + let file = File::open(&filename)?; + let mut stream = BufReader::new(file); + bincode::deserialize_from(&mut stream).map_err(|e| e.into()) + } + + fn store(&self, saved_tower: &SavedTower) -> Result<()> { + let filename = self.filename(&saved_tower.node_pubkey); + trace!("store: {}", filename.display()); + let new_filename = filename.with_extension("bin.new"); + + { + // overwrite anything if exists + let mut file = File::create(&new_filename)?; + bincode::serialize_into(&mut file, saved_tower)?; + // file.sync_all() hurts performance; pipeline sync-ing and submitting votes to the cluster! + } + fs::rename(&new_filename, &filename)?; + // self.path.parent().sync_all() hurts performance same as the above sync + Ok(()) + } +} diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 0da95dd4bd..3c0ffbd1e3 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -11,7 +11,7 @@ use crate::{ }, cluster_slots::ClusterSlots, completed_data_sets_service::CompletedDataSetsSender, - consensus::{Tower, TowerStorage}, + consensus::Tower, cost_model::CostModel, cost_update_service::CostUpdateService, ledger_cleanup_service::LedgerCleanupService, @@ -22,6 +22,7 @@ use crate::{ sigverify_shreds::ShredSigVerifier, sigverify_stage::SigVerifyStage, snapshot_packager_service::PendingSnapshotPackage, + tower_storage::TowerStorage, voting_service::VotingService, }; use crossbeam_channel::unbounded; @@ -455,7 +456,7 @@ pub mod tests { )), &poh_recorder, tower, - Arc::new(crate::consensus::FileTowerStorage::default()), + Arc::new(crate::tower_storage::FileTowerStorage::default()), &leader_schedule_cache, &exit, block_commitment_cache, diff --git a/core/src/validator.rs b/core/src/validator.rs index ddf9b6eccb..80bc2733be 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -5,7 +5,7 @@ use crate::{ cache_block_meta_service::{CacheBlockMetaSender, CacheBlockMetaService}, cluster_info_vote_listener::VoteTracker, completed_data_sets_service::CompletedDataSetsService, - consensus::{reconcile_blockstore_roots_with_tower, FileTowerStorage, Tower, TowerStorage}, + consensus::{reconcile_blockstore_roots_with_tower, Tower}, cost_model::CostModel, rewards_recorder_service::{RewardsRecorderSender, RewardsRecorderService}, sample_performance_service::SamplePerformanceService, @@ -13,6 +13,7 @@ use crate::{ serve_repair_service::ServeRepairService, sigverify, snapshot_packager_service::{PendingSnapshotPackage, SnapshotPackagerService}, + tower_storage::{FileTowerStorage, TowerStorage}, tpu::{Tpu, DEFAULT_TPU_COALESCE_MS}, tvu::{Sockets, Tvu, TvuConfig}, }; diff --git a/core/src/voting_service.rs b/core/src/voting_service.rs index 8f18e84eeb..189d101f0c 100644 --- a/core/src/voting_service.rs +++ b/core/src/voting_service.rs @@ -1,4 +1,4 @@ -use crate::consensus::{SavedTower, TowerStorage}; +use crate::tower_storage::{SavedTower, TowerStorage}; use solana_gossip::cluster_info::ClusterInfo; use solana_measure::measure::Measure; use solana_poh::poh_recorder::PohRecorder; diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index 36db01392b..cae0f5e2a8 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -8,7 +8,7 @@ use { log::*, solana_client::thin_client::{create_client, ThinClient}, solana_core::{ - consensus::FileTowerStorage, + tower_storage::FileTowerStorage, validator::{Validator, ValidatorConfig, ValidatorStartProgress}, }, solana_gossip::{ diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index dda0c023bb..ab3f0967a8 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -16,9 +16,10 @@ use { broadcast_stage::{ broadcast_duplicates_run::BroadcastDuplicatesConfig, BroadcastStageType, }, - consensus::{FileTowerStorage, Tower, SWITCH_FORK_THRESHOLD, VOTE_THRESHOLD_DEPTH}, + consensus::{Tower, SWITCH_FORK_THRESHOLD, VOTE_THRESHOLD_DEPTH}, optimistic_confirmation_verifier::OptimisticConfirmationVerifier, replay_stage::DUPLICATE_THRESHOLD, + tower_storage::FileTowerStorage, validator::ValidatorConfig, }, solana_download_utils::download_snapshot, diff --git a/validator/src/admin_rpc_service.rs b/validator/src/admin_rpc_service.rs index 01c1aae017..78e643bceb 100644 --- a/validator/src/admin_rpc_service.rs +++ b/validator/src/admin_rpc_service.rs @@ -6,8 +6,7 @@ use { jsonrpc_server_utils::tokio, log::*, solana_core::{ - consensus::{Tower, TowerStorage}, - validator::ValidatorStartProgress, + consensus::Tower, tower_storage::TowerStorage, validator::ValidatorStartProgress, }, solana_gossip::cluster_info::ClusterInfo, solana_sdk::{ diff --git a/validator/src/bin/solana-test-validator.rs b/validator/src/bin/solana-test-validator.rs index 990658708e..905ad9dd2b 100644 --- a/validator/src/bin/solana-test-validator.rs +++ b/validator/src/bin/solana-test-validator.rs @@ -8,7 +8,7 @@ use { }, }, solana_client::rpc_client::RpcClient, - solana_core::consensus::FileTowerStorage, + solana_core::tower_storage::FileTowerStorage, solana_faucet::faucet::{run_local_faucet_with_port, FAUCET_PORT}, solana_rpc::rpc::JsonRpcConfig, solana_sdk::{ diff --git a/validator/src/main.rs b/validator/src/main.rs index 4974e7e205..e493ebc816 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -20,8 +20,8 @@ use { rpc_request::MAX_MULTIPLE_ACCOUNTS, }, solana_core::{ - consensus::FileTowerStorage, ledger_cleanup_service::{DEFAULT_MAX_LEDGER_SHREDS, DEFAULT_MIN_MAX_LEDGER_SHREDS}, + tower_storage::FileTowerStorage, tpu::DEFAULT_TPU_COALESCE_MS, validator::{ is_snapshot_config_invalid, Validator, ValidatorConfig, ValidatorStartProgress,