diff --git a/core/benches/consensus.rs b/core/benches/consensus.rs index 662dc32231..abfed7adf0 100644 --- a/core/benches/consensus.rs +++ b/core/benches/consensus.rs @@ -3,48 +3,49 @@ extern crate solana_core; extern crate test; -use solana_core::{consensus::Tower, vote_simulator::VoteSimulator}; -use solana_runtime::bank::Bank; -use solana_runtime::bank_forks::BankForks; -use solana_sdk::{ - pubkey::Pubkey, - signature::{Keypair, Signer}, +use { + solana_core::{ + consensus::{FileTowerStorage, Tower}, + vote_simulator::VoteSimulator, + }, + solana_runtime::bank::Bank, + solana_runtime::bank_forks::BankForks, + solana_sdk::{ + pubkey::Pubkey, + signature::{Keypair, Signer}, + }, + std::{ + collections::{HashMap, HashSet}, + sync::Arc, + }, + tempfile::TempDir, + test::Bencher, + trees::tr, }; -use std::{ - collections::{HashMap, HashSet}, - sync::Arc, -}; -use tempfile::TempDir; -use test::Bencher; -use trees::tr; #[bench] fn bench_save_tower(bench: &mut Bencher) { let dir = TempDir::new().unwrap(); - let path = dir.path(); let vote_account_pubkey = &Pubkey::default(); let node_keypair = Arc::new(Keypair::new()); let heaviest_bank = BankForks::new(Bank::default_for_tests()).working_bank(); + let tower_storage = FileTowerStorage::new(dir.path().to_path_buf()); let tower = Tower::new( &node_keypair.pubkey(), vote_account_pubkey, 0, &heaviest_bank, - path, ); bench.iter(move || { - tower.save(&node_keypair).unwrap(); + tower.save(&tower_storage, &node_keypair).unwrap(); }); } #[bench] #[ignore] fn bench_generate_ancestors_descendants(bench: &mut Bencher) { - let dir = TempDir::new().unwrap(); - let path = dir.path(); - let vote_account_pubkey = &Pubkey::default(); let node_keypair = Arc::new(Keypair::new()); let heaviest_bank = BankForks::new(Bank::default_for_tests()).working_bank(); @@ -53,7 +54,6 @@ fn bench_generate_ancestors_descendants(bench: &mut Bencher) { vote_account_pubkey, 0, &heaviest_bank, - path, ); let num_banks = 500; diff --git a/core/src/consensus.rs b/core/src/consensus.rs index a325f22908..7f35c4c51c 100644 --- a/core/src/consensus.rs +++ b/core/src/consensus.rs @@ -1,39 +1,40 @@ -use crate::{ - heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice, - latest_validator_votes_for_frozen_banks::LatestValidatorVotesForFrozenBanks, - progress_map::{LockoutIntervals, ProgressMap}, -}; -use chrono::prelude::*; -use solana_ledger::{ancestor_iterator::AncestorIterator, blockstore::Blockstore, blockstore_db}; -use solana_measure::measure::Measure; -use solana_runtime::{ - bank::Bank, bank_forks::BankForks, commitment::VOTE_THRESHOLD_SIZE, - vote_account::ArcVoteAccount, -}; -use solana_sdk::{ - clock::{Slot, UnixTimestamp}, - hash::Hash, - instruction::Instruction, - pubkey::Pubkey, - signature::{Keypair, Signature, Signer}, - slot_history::{Check, SlotHistory}, -}; -use solana_vote_program::{ - vote_instruction, - vote_state::{BlockTimestamp, Lockout, Vote, VoteState, MAX_LOCKOUT_HISTORY}, -}; -use std::{ - cmp::Ordering, - collections::{HashMap, HashSet}, - fs::{self, File}, - io::BufReader, - ops::{ - Bound::{Included, Unbounded}, - Deref, +use { + crate::{ + heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice, + latest_validator_votes_for_frozen_banks::LatestValidatorVotesForFrozenBanks, + progress_map::{LockoutIntervals, ProgressMap}, }, - path::{Path, PathBuf}, + chrono::prelude::*, + solana_ledger::{ancestor_iterator::AncestorIterator, blockstore::Blockstore, blockstore_db}, + solana_runtime::{ + bank::Bank, bank_forks::BankForks, commitment::VOTE_THRESHOLD_SIZE, + vote_account::ArcVoteAccount, + }, + solana_sdk::{ + clock::{Slot, UnixTimestamp}, + hash::Hash, + instruction::Instruction, + pubkey::Pubkey, + signature::{Keypair, Signature, Signer}, + slot_history::{Check, SlotHistory}, + }, + solana_vote_program::{ + vote_instruction, + vote_state::{BlockTimestamp, Lockout, Vote, VoteState, MAX_LOCKOUT_HISTORY}, + }, + std::{ + cmp::Ordering, + collections::{HashMap, HashSet}, + fs::{self, File}, + io::BufReader, + ops::{ + Bound::{Included, Unbounded}, + Deref, + }, + path::PathBuf, + }, + thiserror::Error, }; -use thiserror::Error; #[derive(PartialEq, Clone, Debug, AbiExample)] pub enum SwitchForkDecision { @@ -119,12 +120,6 @@ pub struct Tower { last_vote_tx_blockhash: Hash, last_timestamp: BlockTimestamp, #[serde(skip)] - pub(crate) ledger_path: PathBuf, - #[serde(skip)] - path: PathBuf, - #[serde(skip)] - tmp_path: PathBuf, // used before atomic fs::rename() - #[serde(skip)] // Restored last voted slot which cannot be found in SlotHistory at replayed root // (This is a special field for slashing-free validator restart with edge cases). // This could be emptied after some time; but left intact indefinitely for easier @@ -146,9 +141,6 @@ impl Default for Tower { last_vote: Vote::default(), last_timestamp: BlockTimestamp::default(), last_vote_tx_blockhash: Hash::default(), - ledger_path: PathBuf::default(), - path: PathBuf::default(), - tmp_path: PathBuf::default(), stray_restored_slot: Option::default(), last_switch_threshold_check: Option::default(), }; @@ -164,25 +156,15 @@ impl Tower { vote_account_pubkey: &Pubkey, root: Slot, bank: &Bank, - ledger_path: &Path, ) -> Self { let mut tower = Tower { - ledger_path: ledger_path.into(), + node_pubkey: *node_pubkey, ..Tower::default() }; - tower.set_identity(*node_pubkey); tower.initialize_lockouts_from_bank(vote_account_pubkey, root, bank); tower } - fn set_identity(&mut self, node_pubkey: Pubkey) { - let path = Self::get_filename(&self.ledger_path, &node_pubkey); - let tmp_path = Self::get_tmp_filename(&path); - self.node_pubkey = node_pubkey; - self.path = path; - self.tmp_path = tmp_path; - } - #[cfg(test)] pub fn new_for_tests(threshold_depth: usize, threshold_size: f64) -> Self { Self { @@ -194,7 +176,6 @@ impl Tower { pub fn new_from_bankforks( bank_forks: &BankForks, - ledger_path: &Path, node_pubkey: &Pubkey, vote_account: &Pubkey, ) -> Self { @@ -216,7 +197,7 @@ impl Tower { ) .clone(); - Self::new(node_pubkey, vote_account, root, &heaviest_bank, ledger_path) + Self::new(node_pubkey, vote_account, root, &heaviest_bank) } pub(crate) fn collect_vote_lockouts( @@ -1195,71 +1176,15 @@ impl Tower { self.vote_state.root_slot = Some(root); } - pub fn get_filename(path: &Path, node_pubkey: &Pubkey) -> PathBuf { - path.join(format!("tower-{}", node_pubkey)) - .with_extension("bin") - } - - fn get_tmp_filename(path: &Path) -> PathBuf { - path.with_extension("bin.new") - } - - pub fn save(&self, node_keypair: &Keypair) -> Result<()> { - let mut measure = Measure::start("tower_save-ms"); - - if self.node_pubkey != node_keypair.pubkey() { - return Err(TowerError::WrongTower(format!( - "node_pubkey is {:?} but found tower for {:?}", - node_keypair.pubkey(), - self.node_pubkey - ))); - } - - let filename = &self.path; - let new_filename = &self.tmp_path; - { - // overwrite anything if exists - let mut file = File::create(&new_filename)?; - let saved_tower = SavedTower::new(self, node_keypair)?; - bincode::serialize_into(&mut file, &saved_tower)?; - // file.sync_all() hurts performance; pipeline sync-ing and submitting votes to the cluster! - } - trace!("persisted votes: {:?}", self.voted_slots()); - fs::rename(&new_filename, &filename)?; - // self.path.parent().sync_all() hurts performance same as the above sync - - measure.stop(); - inc_new_counter_info!("tower_save-ms", measure.as_ms() as usize); - + pub fn save(&self, tower_storage: &dyn TowerStorage, node_keypair: &Keypair) -> Result<()> { + let saved_tower = SavedTower::new(self, node_keypair)?; + tower_storage.store(&saved_tower)?; Ok(()) } - pub fn restore(ledger_path: &Path, node_pubkey: &Pubkey) -> Result { - let filename = Self::get_filename(ledger_path, node_pubkey); - - // Ensure to create parent dir here, because restore() precedes save() always - fs::create_dir_all(&filename.parent().unwrap())?; - - let file = File::open(&filename)?; - let mut stream = BufReader::new(file); - - let saved_tower: SavedTower = bincode::deserialize_from(&mut stream)?; - if !saved_tower.verify(node_pubkey) { - return Err(TowerError::InvalidSignature); - } - let mut tower = saved_tower.deserialize()?; - tower.ledger_path = ledger_path.into(); - tower.path = filename; - tower.tmp_path = Self::get_tmp_filename(&tower.path); - - // check that the tower actually belongs to this node - if &tower.node_pubkey != node_pubkey { - return Err(TowerError::WrongTower(format!( - "node_pubkey is {:?} but found tower for {:?}", - node_pubkey, tower.node_pubkey - ))); - } - Ok(tower) + pub fn restore(tower_storage: &dyn TowerStorage, node_pubkey: &Pubkey) -> Result { + let saved_tower = tower_storage.load(node_pubkey)?; + saved_tower.try_into_tower(node_pubkey) } } @@ -1300,26 +1225,104 @@ impl TowerError { } } +pub trait TowerStorage: Sync + Send { + fn load(&self, node_pubkey: &Pubkey) -> Result; + fn store(&self, saved_tower: &SavedTower) -> Result<()>; +} + +#[derive(Debug, Default, Clone, PartialEq)] +pub struct FileTowerStorage { + pub tower_path: PathBuf, +} + +impl FileTowerStorage { + pub fn new(tower_path: PathBuf) -> Self { + Self { tower_path } + } + + pub fn filename(&self, node_pubkey: &Pubkey) -> PathBuf { + self.tower_path + .join(format!("tower-{}", node_pubkey)) + .with_extension("bin") + } +} + +impl TowerStorage for FileTowerStorage { + fn load(&self, node_pubkey: &Pubkey) -> Result { + let filename = self.filename(node_pubkey); + trace!("load {}", filename.display()); + + // Ensure to create parent dir here, because restore() precedes save() always + fs::create_dir_all(&filename.parent().unwrap())?; + + let file = File::open(&filename)?; + let mut stream = BufReader::new(file); + bincode::deserialize_from(&mut stream).map_err(|e| e.into()) + } + + fn store(&self, saved_tower: &SavedTower) -> Result<()> { + let filename = self.filename(&saved_tower.node_pubkey); + trace!("store: {}", filename.display()); + let new_filename = filename.with_extension("bin.new"); + + { + // overwrite anything if exists + let mut file = File::create(&new_filename)?; + bincode::serialize_into(&mut file, saved_tower)?; + // file.sync_all() hurts performance; pipeline sync-ing and submitting votes to the cluster! + } + fs::rename(&new_filename, &filename)?; + // self.path.parent().sync_all() hurts performance same as the above sync + Ok(()) + } +} + #[frozen_abi(digest = "Gaxfwvx5MArn52mKZQgzHmDCyn5YfCuTHvp5Et3rFfpp")] #[derive(Default, Clone, Serialize, Deserialize, Debug, PartialEq, AbiExample)] pub struct SavedTower { signature: Signature, data: Vec, + #[serde(skip)] + node_pubkey: Pubkey, } impl SavedTower { pub fn new(tower: &Tower, keypair: &T) -> Result { + let node_pubkey = keypair.pubkey(); + if tower.node_pubkey != node_pubkey { + return Err(TowerError::WrongTower(format!( + "node_pubkey is {:?} but found tower for {:?}", + node_pubkey, tower.node_pubkey + ))); + } + let data = bincode::serialize(tower)?; let signature = keypair.sign_message(&data); - Ok(Self { signature, data }) + Ok(Self { + signature, + data, + node_pubkey, + }) } - pub fn verify(&self, pubkey: &Pubkey) -> bool { - self.signature.verify(pubkey.as_ref(), &self.data) - } + pub fn try_into_tower(self, node_pubkey: &Pubkey) -> Result { + // This method assumes that `self` was just deserialized + assert_eq!(self.node_pubkey, Pubkey::default()); - pub fn deserialize(&self) -> Result { - bincode::deserialize(&self.data).map_err(|e| e.into()) + if !self.signature.verify(node_pubkey.as_ref(), &self.data) { + return Err(TowerError::InvalidSignature); + } + bincode::deserialize(&self.data) + .map_err(|e| e.into()) + .and_then(|tower: Tower| { + if tower.node_pubkey != *node_pubkey { + return Err(TowerError::WrongTower(format!( + "node_pubkey is {:?} but found tower for {:?}", + node_pubkey, tower.node_pubkey + ))); + } + Ok(tower) + }) } } @@ -2518,20 +2521,20 @@ pub mod test { F: Fn(&mut Tower, &Pubkey), G: Fn(&PathBuf), { - let dir = TempDir::new().unwrap(); + let tower_path = TempDir::new().unwrap(); let identity_keypair = Arc::new(Keypair::new()); + let node_pubkey = identity_keypair.pubkey(); // Use values that will not match the default derived from BankForks let mut tower = Tower::new_for_tests(10, 0.9); - tower.ledger_path = dir.path().to_path_buf(); - tower.path = Tower::get_filename(&tower.ledger_path, &identity_keypair.pubkey()); - tower.tmp_path = Tower::get_tmp_filename(&tower.path); - modify_original(&mut tower, &identity_keypair.pubkey()); + let tower_storage = FileTowerStorage::new(tower_path.path().to_path_buf()); - tower.save(&identity_keypair).unwrap(); - modify_serialized(&tower.path); - let loaded = Tower::restore(dir.path(), &identity_keypair.pubkey()); + modify_original(&mut tower, &node_pubkey); + + tower.save(&tower_storage, &identity_keypair).unwrap(); + modify_serialized(&tower_storage.filename(&node_pubkey)); + let loaded = Tower::restore(&tower_storage, &node_pubkey); (tower, loaded) } @@ -2760,8 +2763,9 @@ pub mod test { fn test_load_tower_wrong_identity() { let identity_keypair = Arc::new(Keypair::new()); let tower = Tower::default(); + let tower_storage = FileTowerStorage::default(); assert_matches!( - tower.save(&identity_keypair), + tower.save(&tower_storage, &identity_keypair), Err(TowerError::WrongTower(_)) ) } diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index df83eaf384..e0b3380155 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -1,70 +1,73 @@ //! The `replay_stage` replays transactions broadcast by the leader. - -use crate::{ - ancestor_hashes_service::AncestorHashesReplayUpdateSender, - broadcast_stage::RetransmitSlotsSender, - cache_block_meta_service::CacheBlockMetaSender, - cluster_info_vote_listener::{ - GossipDuplicateConfirmedSlotsReceiver, GossipVerifiedVoteHashReceiver, VoteTracker, +use { + crate::{ + ancestor_hashes_service::AncestorHashesReplayUpdateSender, + broadcast_stage::RetransmitSlotsSender, + cache_block_meta_service::CacheBlockMetaSender, + cluster_info_vote_listener::{ + GossipDuplicateConfirmedSlotsReceiver, GossipVerifiedVoteHashReceiver, VoteTracker, + }, + cluster_slot_state_verifier::*, + cluster_slots::ClusterSlots, + cluster_slots_service::ClusterSlotsUpdateSender, + commitment_service::{AggregateCommitmentService, CommitmentAggregationData}, + consensus::{ + ComputedBankState, Stake, SwitchForkDecision, Tower, TowerStorage, VotedStakes, + SWITCH_FORK_THRESHOLD, + }, + fork_choice::{ForkChoice, SelectVoteAndResetForkResult}, + heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice, + latest_validator_votes_for_frozen_banks::LatestValidatorVotesForFrozenBanks, + progress_map::{ForkProgress, ProgressMap, PropagatedStats}, + repair_service::DuplicateSlotsResetReceiver, + rewards_recorder_service::RewardsRecorderSender, + unfrozen_gossip_verified_vote_hashes::UnfrozenGossipVerifiedVoteHashes, + voting_service::VoteOp, + window_service::DuplicateSlotReceiver, }, - cluster_slot_state_verifier::*, - cluster_slots::ClusterSlots, - cluster_slots_service::ClusterSlotsUpdateSender, - commitment_service::{AggregateCommitmentService, CommitmentAggregationData}, - consensus::{ - ComputedBankState, Stake, SwitchForkDecision, Tower, VotedStakes, SWITCH_FORK_THRESHOLD, + solana_client::rpc_response::SlotUpdate, + solana_entry::entry::VerifyRecyclers, + solana_gossip::cluster_info::ClusterInfo, + solana_ledger::{ + block_error::BlockError, + blockstore::Blockstore, + blockstore_processor::{self, BlockstoreProcessorError, TransactionStatusSender}, + leader_schedule_cache::LeaderScheduleCache, }, - fork_choice::{ForkChoice, SelectVoteAndResetForkResult}, - heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice, - latest_validator_votes_for_frozen_banks::LatestValidatorVotesForFrozenBanks, - progress_map::{ForkProgress, ProgressMap, PropagatedStats}, - repair_service::DuplicateSlotsResetReceiver, - rewards_recorder_service::RewardsRecorderSender, - unfrozen_gossip_verified_vote_hashes::UnfrozenGossipVerifiedVoteHashes, - voting_service::VoteOp, - window_service::DuplicateSlotReceiver, -}; -use solana_client::rpc_response::SlotUpdate; -use solana_entry::entry::VerifyRecyclers; -use solana_gossip::cluster_info::ClusterInfo; -use solana_ledger::{ - block_error::BlockError, - blockstore::Blockstore, - blockstore_processor::{self, BlockstoreProcessorError, TransactionStatusSender}, - leader_schedule_cache::LeaderScheduleCache, -}; -use solana_measure::measure::Measure; -use solana_metrics::inc_new_counter_info; -use solana_poh::poh_recorder::{PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS}; -use solana_rpc::{ - optimistically_confirmed_bank_tracker::{BankNotification, BankNotificationSender}, - rpc_subscriptions::RpcSubscriptions, -}; -use solana_runtime::{ - accounts_background_service::AbsRequestSender, bank::Bank, bank::ExecuteTimings, - bank_forks::BankForks, commitment::BlockCommitmentCache, vote_sender_types::ReplayVoteSender, -}; -use solana_sdk::{ - clock::{BankId, Slot, MAX_PROCESSING_AGE, NUM_CONSECUTIVE_LEADER_SLOTS}, - genesis_config::ClusterType, - hash::Hash, - pubkey::Pubkey, - signature::Signature, - signature::{Keypair, Signer}, - timing::timestamp, - transaction::Transaction, -}; -use solana_vote_program::vote_state::Vote; -use std::{ - collections::{HashMap, HashSet}, - result, - sync::{ - atomic::{AtomicBool, Ordering}, - mpsc::{Receiver, RecvTimeoutError, Sender}, - Arc, Mutex, RwLock, + solana_measure::measure::Measure, + solana_metrics::inc_new_counter_info, + solana_poh::poh_recorder::{PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS}, + solana_rpc::{ + optimistically_confirmed_bank_tracker::{BankNotification, BankNotificationSender}, + rpc_subscriptions::RpcSubscriptions, + }, + solana_runtime::{ + accounts_background_service::AbsRequestSender, bank::Bank, bank::ExecuteTimings, + bank_forks::BankForks, commitment::BlockCommitmentCache, + vote_sender_types::ReplayVoteSender, + }, + solana_sdk::{ + clock::{BankId, Slot, MAX_PROCESSING_AGE, NUM_CONSECUTIVE_LEADER_SLOTS}, + genesis_config::ClusterType, + hash::Hash, + pubkey::Pubkey, + signature::Signature, + signature::{Keypair, Signer}, + timing::timestamp, + transaction::Transaction, + }, + solana_vote_program::vote_state::Vote, + std::{ + collections::{HashMap, HashSet}, + result, + sync::{ + atomic::{AtomicBool, Ordering}, + mpsc::{Receiver, RecvTimeoutError, Sender}, + Arc, Mutex, RwLock, + }, + thread::{self, Builder, JoinHandle}, + time::{Duration, Instant}, }, - thread::{self, Builder, JoinHandle}, - time::{Duration, Instant}, }; pub const MAX_ENTRY_RECV_PER_ITER: usize = 512; @@ -128,6 +131,7 @@ pub struct ReplayStageConfig { pub bank_notification_sender: Option, pub wait_for_vote_to_start_leader: bool, pub ancestor_hashes_replay_update_sender: AncestorHashesReplayUpdateSender, + pub tower_storage: Arc, } #[derive(Default)] @@ -336,6 +340,7 @@ impl ReplayStage { bank_notification_sender, wait_for_vote_to_start_leader, ancestor_hashes_replay_update_sender, + tower_storage, } = config; trace!("replay stage"); @@ -593,6 +598,7 @@ impl ReplayStage { switch_fork_decision, &bank_forks, &mut tower, + tower_storage.as_ref(), &mut progress, &vote_account, &identity_keypair, @@ -648,7 +654,7 @@ impl ReplayStage { my_pubkey = identity_keypair.pubkey(); // Load the new identity's tower - tower = Tower::restore(&tower.ledger_path, &my_pubkey) + tower = Tower::restore(tower_storage.as_ref(), &my_pubkey) .and_then(|restored_tower| { let root_bank = bank_forks.read().unwrap().root_bank(); let slot_history = root_bank.get_slot_history(); @@ -1482,6 +1488,7 @@ impl ReplayStage { switch_fork_decision: &SwitchForkDecision, bank_forks: &Arc>, tower: &mut Tower, + tower_storage: &dyn TowerStorage, progress: &mut ProgressMap, vote_account_pubkey: &Pubkey, identity_keypair: &Keypair, @@ -1509,9 +1516,14 @@ impl ReplayStage { trace!("handle votable bank {}", bank.slot()); let new_root = tower.record_bank_vote(bank, vote_account_pubkey); - if let Err(err) = tower.save(identity_keypair) { - error!("Unable to save tower: {:?}", err); - std::process::exit(1); + { + let mut measure = Measure::start("tower_save-ms"); + if let Err(err) = tower.save(tower_storage, identity_keypair) { + error!("Unable to save tower: {:?}", err); + std::process::exit(1); + } + measure.stop(); + inc_new_counter_info!("tower_save-ms", measure.as_ms() as usize); } if let Some(new_root) = new_root { @@ -2877,7 +2889,6 @@ pub mod tests { let my_vote_pubkey = my_keypairs.vote_keypair.pubkey(); let tower = Tower::new_from_bankforks( &bank_forks.read().unwrap(), - blockstore.ledger_path(), &cluster_info.id(), &my_vote_pubkey, ); diff --git a/core/src/tvu.rs b/core/src/tvu.rs index e08bc38a70..ad204b56dc 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -11,7 +11,7 @@ use crate::{ }, cluster_slots::ClusterSlots, completed_data_sets_service::CompletedDataSetsSender, - consensus::Tower, + consensus::{Tower, TowerStorage}, cost_model::CostModel, cost_update_service::CostUpdateService, ledger_cleanup_service::LedgerCleanupService, @@ -114,6 +114,7 @@ impl Tvu { rpc_subscriptions: &Arc, poh_recorder: &Arc>, tower: Tower, + tower_storage: Arc, leader_schedule_cache: &Arc, exit: &Arc, block_commitment_cache: Arc>, @@ -277,6 +278,7 @@ impl Tvu { bank_notification_sender, wait_for_vote_to_start_leader: tvu_config.wait_for_vote_to_start_leader, ancestor_hashes_replay_update_sender, + tower_storage, }; let (voting_sender, voting_receiver) = channel(); @@ -449,6 +451,7 @@ pub mod tests { )), &poh_recorder, tower, + Arc::new(crate::consensus::FileTowerStorage::default()), &leader_schedule_cache, &exit, block_commitment_cache, diff --git a/core/src/validator.rs b/core/src/validator.rs index fdc528a344..30d70a1ad4 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -5,7 +5,7 @@ use crate::{ cache_block_meta_service::{CacheBlockMetaSender, CacheBlockMetaService}, cluster_info_vote_listener::VoteTracker, completed_data_sets_service::CompletedDataSetsService, - consensus::{reconcile_blockstore_roots_with_tower, Tower}, + consensus::{reconcile_blockstore_roots_with_tower, FileTowerStorage, Tower, TowerStorage}, cost_model::CostModel, rewards_recorder_service::{RewardsRecorderSender, RewardsRecorderService}, sample_performance_service::SamplePerformanceService, @@ -90,7 +90,6 @@ use std::{ const MAX_COMPLETED_DATA_SETS_IN_CHANNEL: usize = 100_000; const WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT: u64 = 90; -#[derive(Debug)] pub struct ValidatorConfig { pub dev_halt_at_slot: Option, pub expected_genesis_hash: Option, @@ -124,7 +123,7 @@ pub struct ValidatorConfig { pub wal_recovery_mode: Option, pub poh_verify: bool, // Perform PoH verification during blockstore processing at boo pub require_tower: bool, - pub tower_path: Option, + pub tower_storage: Arc, pub debug_keys: Option>>, pub contact_debug_interval: u64, pub contact_save_interval: u64, @@ -181,7 +180,7 @@ impl Default for ValidatorConfig { wal_recovery_mode: None, poh_verify: true, require_tower: false, - tower_path: None, + tower_storage: Arc::new(FileTowerStorage::new(PathBuf::default())), debug_keys: None, contact_debug_interval: DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS, contact_save_interval: DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS, @@ -715,6 +714,7 @@ impl Validator { &rpc_subscriptions, &poh_recorder, tower, + config.tower_storage.clone(), &leader_schedule_cache, &exit, block_commitment_cache, @@ -949,7 +949,6 @@ fn post_process_restored_tower( validator_identity: &Pubkey, vote_account: &Pubkey, config: &ValidatorConfig, - tower_path: &Path, bank_forks: &BankForks, ) -> Tower { let mut should_require_tower = config.require_tower; @@ -1028,7 +1027,6 @@ fn post_process_restored_tower( Tower::new_from_bankforks( bank_forks, - tower_path, validator_identity, vote_account, ) @@ -1096,9 +1094,7 @@ fn new_banks_from_ledger( .expect("Failed to open ledger database"); blockstore.set_no_compaction(config.no_rocksdb_compaction); - let tower_path = config.tower_path.as_deref().unwrap_or(ledger_path); - - let restored_tower = Tower::restore(tower_path, validator_identity); + let restored_tower = Tower::restore(config.tower_storage.as_ref(), validator_identity); if let Ok(tower) = &restored_tower { reconcile_blockstore_roots_with_tower(tower, &blockstore).unwrap_or_else(|err| { error!("Failed to reconcile blockstore with tower: {:?}", err); @@ -1219,7 +1215,6 @@ fn new_banks_from_ledger( validator_identity, vote_account, config, - tower_path, &bank_forks, ); diff --git a/local-cluster/src/cluster.rs b/local-cluster/src/cluster.rs index 8211041970..b5bcf658fb 100644 --- a/local-cluster/src/cluster.rs +++ b/local-cluster/src/cluster.rs @@ -1,12 +1,11 @@ -use solana_client::thin_client::ThinClient; -use solana_core::validator::Validator; -use solana_core::validator::ValidatorConfig; -use solana_gossip::{cluster_info::Node, contact_info::ContactInfo}; -use solana_sdk::pubkey::Pubkey; -use solana_sdk::signature::Keypair; -use solana_streamer::socket::SocketAddrSpace; -use std::path::PathBuf; -use std::sync::Arc; +use { + solana_client::thin_client::ThinClient, + solana_core::validator::{Validator, ValidatorConfig}, + solana_gossip::{cluster_info::Node, contact_info::ContactInfo}, + solana_sdk::{pubkey::Pubkey, signature::Keypair}, + solana_streamer::socket::SocketAddrSpace, + std::{path::PathBuf, sync::Arc}, +}; pub struct ValidatorInfo { pub keypair: Arc, diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index 1bba0aaaa5..36db01392b 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -1,55 +1,59 @@ -use crate::{ - cluster::{Cluster, ClusterValidatorInfo, ValidatorInfo}, - cluster_tests, - validator_configs::*, -}; -use itertools::izip; -use log::*; -use solana_client::thin_client::{create_client, ThinClient}; -use solana_core::validator::{Validator, ValidatorConfig, ValidatorStartProgress}; -use solana_gossip::{ - cluster_info::{Node, VALIDATOR_PORT_RANGE}, - contact_info::ContactInfo, - gossip_service::discover_cluster, -}; -use solana_ledger::create_new_tmp_ledger; -use solana_runtime::genesis_utils::{ - create_genesis_config_with_vote_accounts_and_cluster_type, GenesisConfigInfo, - ValidatorVoteKeypairs, -}; -use solana_sdk::{ - account::Account, - account::AccountSharedData, - client::SyncClient, - clock::{DEFAULT_DEV_SLOTS_PER_EPOCH, DEFAULT_TICKS_PER_SLOT}, - commitment_config::CommitmentConfig, - epoch_schedule::EpochSchedule, - genesis_config::{ClusterType, GenesisConfig}, - message::Message, - poh_config::PohConfig, - pubkey::Pubkey, - signature::{Keypair, Signer}, - stake::{ - config as stake_config, instruction as stake_instruction, - state::{Authorized, Lockup}, +use { + crate::{ + cluster::{Cluster, ClusterValidatorInfo, ValidatorInfo}, + cluster_tests, + validator_configs::*, + }, + itertools::izip, + log::*, + solana_client::thin_client::{create_client, ThinClient}, + solana_core::{ + consensus::FileTowerStorage, + validator::{Validator, ValidatorConfig, ValidatorStartProgress}, + }, + solana_gossip::{ + cluster_info::{Node, VALIDATOR_PORT_RANGE}, + contact_info::ContactInfo, + gossip_service::discover_cluster, + }, + solana_ledger::create_new_tmp_ledger, + solana_runtime::genesis_utils::{ + create_genesis_config_with_vote_accounts_and_cluster_type, GenesisConfigInfo, + ValidatorVoteKeypairs, + }, + solana_sdk::{ + account::Account, + account::AccountSharedData, + client::SyncClient, + clock::{DEFAULT_DEV_SLOTS_PER_EPOCH, DEFAULT_TICKS_PER_SLOT}, + commitment_config::CommitmentConfig, + epoch_schedule::EpochSchedule, + genesis_config::{ClusterType, GenesisConfig}, + message::Message, + poh_config::PohConfig, + pubkey::Pubkey, + signature::{Keypair, Signer}, + stake::{ + config as stake_config, instruction as stake_instruction, + state::{Authorized, Lockup}, + }, + system_transaction, + transaction::Transaction, + }, + solana_stake_program::{config::create_account as create_stake_config_account, stake_state}, + solana_streamer::socket::SocketAddrSpace, + solana_vote_program::{ + vote_instruction, + vote_state::{VoteInit, VoteState}, + }, + std::{ + collections::HashMap, + io::{Error, ErrorKind, Result}, + iter, + sync::{Arc, RwLock}, }, - system_transaction, - transaction::Transaction, -}; -use solana_stake_program::{config::create_account as create_stake_config_account, stake_state}; -use solana_streamer::socket::SocketAddrSpace; -use solana_vote_program::{ - vote_instruction, - vote_state::{VoteInit, VoteState}, -}; -use std::{ - collections::HashMap, - io::{Error, ErrorKind, Result}, - iter, - sync::{Arc, RwLock}, }; -#[derive(Debug)] pub struct ClusterConfig { /// The validator config that should be applied to every node in the cluster pub validator_configs: Vec, @@ -207,6 +211,7 @@ impl LocalCluster { let mut leader_config = safe_clone_config(&config.validator_configs[0]); leader_config.rpc_addrs = Some((leader_node.info.rpc, leader_node.info.rpc_pubsub)); leader_config.account_paths = vec![leader_ledger_path.join("accounts")]; + leader_config.tower_storage = Arc::new(FileTowerStorage::new(leader_ledger_path.clone())); let leader_keypair = Arc::new(Keypair::from_bytes(&leader_keypair.to_bytes()).unwrap()); let leader_vote_keypair = Arc::new(Keypair::from_bytes(&leader_vote_keypair.to_bytes()).unwrap()); @@ -367,6 +372,7 @@ impl LocalCluster { let mut config = safe_clone_config(validator_config); config.rpc_addrs = Some((validator_node.info.rpc, validator_node.info.rpc_pubsub)); config.account_paths = vec![ledger_path.join("accounts")]; + config.tower_storage = Arc::new(FileTowerStorage::new(ledger_path.clone())); let voting_keypair = voting_keypair.unwrap(); let validator_server = Validator::new( validator_node, @@ -704,6 +710,8 @@ impl Cluster for LocalCluster { let validator_info = &cluster_validator_info.info; cluster_validator_info.config.account_paths = vec![validator_info.ledger_path.join("accounts")]; + cluster_validator_info.config.tower_storage = + Arc::new(FileTowerStorage::new(validator_info.ledger_path.clone())); let restarted_node = Validator::new( node, validator_info.keypair.clone(), diff --git a/local-cluster/src/validator_configs.rs b/local-cluster/src/validator_configs.rs index 9f804de1fd..dc514f5810 100644 --- a/local-cluster/src/validator_configs.rs +++ b/local-cluster/src/validator_configs.rs @@ -37,7 +37,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig { wal_recovery_mode: config.wal_recovery_mode.clone(), poh_verify: config.poh_verify, require_tower: config.require_tower, - tower_path: config.tower_path.clone(), + tower_storage: config.tower_storage.clone(), debug_keys: config.debug_keys.clone(), contact_debug_interval: config.contact_debug_interval, contact_save_interval: config.contact_save_interval, diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index 88b9b9ced3..e78bf2e98f 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -1,73 +1,77 @@ #![allow(clippy::integer_arithmetic)] -use assert_matches::assert_matches; -use crossbeam_channel::{unbounded, Receiver}; -use gag::BufferRedirect; -use log::*; -use serial_test::serial; -use solana_client::{ - pubsub_client::PubsubClient, - rpc_client::RpcClient, - rpc_config::{RpcProgramAccountsConfig, RpcSignatureSubscribeConfig}, - rpc_response::RpcSignatureResult, - thin_client::{create_client, ThinClient}, +use { + assert_matches::assert_matches, + crossbeam_channel::{unbounded, Receiver}, + gag::BufferRedirect, + log::*, + serial_test::serial, + solana_client::{ + pubsub_client::PubsubClient, + rpc_client::RpcClient, + rpc_config::{RpcProgramAccountsConfig, RpcSignatureSubscribeConfig}, + rpc_response::RpcSignatureResult, + thin_client::{create_client, ThinClient}, + }, + solana_core::{ + broadcast_stage::{ + broadcast_duplicates_run::BroadcastDuplicatesConfig, BroadcastStageType, + }, + consensus::{FileTowerStorage, Tower, SWITCH_FORK_THRESHOLD, VOTE_THRESHOLD_DEPTH}, + optimistic_confirmation_verifier::OptimisticConfirmationVerifier, + replay_stage::DUPLICATE_THRESHOLD, + validator::ValidatorConfig, + }, + solana_download_utils::download_snapshot, + solana_gossip::{ + cluster_info::VALIDATOR_PORT_RANGE, + crds::Cursor, + gossip_service::{self, discover_cluster}, + }, + solana_ledger::{ + ancestor_iterator::AncestorIterator, + blockstore::{Blockstore, PurgeType}, + blockstore_db::AccessType, + leader_schedule::FixedSchedule, + leader_schedule::LeaderSchedule, + }, + solana_local_cluster::{ + cluster::{Cluster, ClusterValidatorInfo}, + cluster_tests, + local_cluster::{ClusterConfig, LocalCluster}, + validator_configs::*, + }, + solana_runtime::{ + snapshot_config::SnapshotConfig, + snapshot_utils::{self, ArchiveFormat}, + }, + solana_sdk::{ + account::AccountSharedData, + client::{AsyncClient, SyncClient}, + clock::{self, Slot, DEFAULT_MS_PER_SLOT, DEFAULT_TICKS_PER_SLOT, MAX_RECENT_BLOCKHASHES}, + commitment_config::CommitmentConfig, + epoch_schedule::MINIMUM_SLOTS_PER_EPOCH, + genesis_config::ClusterType, + hash::Hash, + poh_config::PohConfig, + pubkey::Pubkey, + signature::{Keypair, Signer}, + system_program, system_transaction, + }, + solana_streamer::socket::SocketAddrSpace, + solana_vote_program::{vote_state::MAX_LOCKOUT_HISTORY, vote_transaction}, + std::{ + collections::{BTreeSet, HashMap, HashSet}, + fs, + io::Read, + iter, + path::{Path, PathBuf}, + sync::atomic::{AtomicBool, Ordering}, + sync::Arc, + thread::{sleep, Builder, JoinHandle}, + time::{Duration, Instant}, + }, + tempfile::TempDir, }; -use solana_core::{ - broadcast_stage::{broadcast_duplicates_run::BroadcastDuplicatesConfig, BroadcastStageType}, - consensus::{Tower, SWITCH_FORK_THRESHOLD, VOTE_THRESHOLD_DEPTH}, - optimistic_confirmation_verifier::OptimisticConfirmationVerifier, - replay_stage::DUPLICATE_THRESHOLD, - validator::ValidatorConfig, -}; -use solana_download_utils::download_snapshot; -use solana_gossip::{ - cluster_info::VALIDATOR_PORT_RANGE, - crds::Cursor, - gossip_service::{self, discover_cluster}, -}; -use solana_ledger::{ - ancestor_iterator::AncestorIterator, - blockstore::{Blockstore, PurgeType}, - blockstore_db::AccessType, - leader_schedule::FixedSchedule, - leader_schedule::LeaderSchedule, -}; -use solana_local_cluster::{ - cluster::{Cluster, ClusterValidatorInfo}, - cluster_tests, - local_cluster::{ClusterConfig, LocalCluster}, - validator_configs::*, -}; -use solana_runtime::{ - snapshot_config::SnapshotConfig, - snapshot_utils::{self, ArchiveFormat}, -}; -use solana_sdk::{ - account::AccountSharedData, - client::{AsyncClient, SyncClient}, - clock::{self, Slot, DEFAULT_MS_PER_SLOT, DEFAULT_TICKS_PER_SLOT, MAX_RECENT_BLOCKHASHES}, - commitment_config::CommitmentConfig, - epoch_schedule::MINIMUM_SLOTS_PER_EPOCH, - genesis_config::ClusterType, - hash::Hash, - poh_config::PohConfig, - pubkey::Pubkey, - signature::{Keypair, Signer}, - system_program, system_transaction, -}; -use solana_streamer::socket::SocketAddrSpace; -use solana_vote_program::{vote_state::MAX_LOCKOUT_HISTORY, vote_transaction}; -use std::{ - collections::{BTreeSet, HashMap, HashSet}, - fs, - io::Read, - iter, - path::{Path, PathBuf}, - sync::atomic::{AtomicBool, Ordering}, - sync::Arc, - thread::{sleep, Builder, JoinHandle}, - time::{Duration, Instant}, -}; -use tempfile::TempDir; const RUST_LOG_FILTER: &str = "error,solana_core::replay_stage=warn,solana_local_cluster=info,local_cluster=info"; @@ -2484,6 +2488,8 @@ fn test_validator_saves_tower() { .ledger_path .clone(); + let file_tower_storage = FileTowerStorage::new(ledger_path.clone()); + // Wait for some votes to be generated let mut last_replayed_root; loop { @@ -2500,7 +2506,7 @@ fn test_validator_saves_tower() { // Stop validator and check saved tower let validator_info = cluster.exit_node(&validator_id); - let tower1 = Tower::restore(&ledger_path, &validator_id).unwrap(); + let tower1 = Tower::restore(&file_tower_storage, &validator_id).unwrap(); trace!("tower1: {:?}", tower1); assert_eq!(tower1.root(), 0); @@ -2528,14 +2534,16 @@ fn test_validator_saves_tower() { .get_slot_with_commitment(CommitmentConfig::processed()) .unwrap(); let validator_info = cluster.exit_node(&validator_id); - let tower2 = Tower::restore(&ledger_path, &validator_id).unwrap(); + let tower2 = Tower::restore(&file_tower_storage, &validator_id).unwrap(); trace!("tower2: {:?}", tower2); assert_eq!(tower2.root(), last_replayed_root); last_replayed_root = recent_slot; // Rollback saved tower to `tower1` to simulate a validator starting from a newer snapshot // without having to wait for that snapshot to be generated in this test - tower1.save(&validator_identity_keypair).unwrap(); + tower1 + .save(&file_tower_storage, &validator_identity_keypair) + .unwrap(); cluster.restart_node(&validator_id, validator_info, SocketAddrSpace::Unspecified); let validator_client = cluster.get_validator_client(&validator_id).unwrap(); @@ -2560,7 +2568,7 @@ fn test_validator_saves_tower() { // Check the new root is reflected in the saved tower state let mut validator_info = cluster.exit_node(&validator_id); - let tower3 = Tower::restore(&ledger_path, &validator_id).unwrap(); + let tower3 = Tower::restore(&file_tower_storage, &validator_id).unwrap(); trace!("tower3: {:?}", tower3); assert!(tower3.root() > last_replayed_root); @@ -2588,7 +2596,7 @@ fn test_validator_saves_tower() { cluster.close_preserve_ledgers(); - let tower4 = Tower::restore(&ledger_path, &validator_id).unwrap(); + let tower4 = Tower::restore(&file_tower_storage, &validator_id).unwrap(); trace!("tower4: {:?}", tower4); // should tower4 advance 1 slot compared to tower3???? assert_eq!(tower4.root(), tower3.root() + 1); @@ -2606,8 +2614,10 @@ fn purge_slots(blockstore: &Blockstore, start_slot: Slot, slot_count: Slot) { blockstore.purge_slots(start_slot, start_slot + slot_count, PurgeType::Exact); } -fn restore_tower(ledger_path: &Path, node_pubkey: &Pubkey) -> Option { - let tower = Tower::restore(ledger_path, node_pubkey); +fn restore_tower(tower_path: &Path, node_pubkey: &Pubkey) -> Option { + let file_tower_storage = FileTowerStorage::new(tower_path.to_path_buf()); + + let tower = Tower::restore(&file_tower_storage, node_pubkey); if let Err(tower_err) = tower { if tower_err.is_file_missing() { return None; @@ -2616,19 +2626,20 @@ fn restore_tower(ledger_path: &Path, node_pubkey: &Pubkey) -> Option { } } // actually saved tower must have at least one vote. - Tower::restore(ledger_path, node_pubkey).ok() + Tower::restore(&file_tower_storage, node_pubkey).ok() } -fn last_vote_in_tower(ledger_path: &Path, node_pubkey: &Pubkey) -> Option<(Slot, Hash)> { - restore_tower(ledger_path, node_pubkey).map(|tower| tower.last_voted_slot_hash().unwrap()) +fn last_vote_in_tower(tower_path: &Path, node_pubkey: &Pubkey) -> Option<(Slot, Hash)> { + restore_tower(tower_path, node_pubkey).map(|tower| tower.last_voted_slot_hash().unwrap()) } -fn root_in_tower(ledger_path: &Path, node_pubkey: &Pubkey) -> Option { - restore_tower(ledger_path, node_pubkey).map(|tower| tower.root()) +fn root_in_tower(tower_path: &Path, node_pubkey: &Pubkey) -> Option { + restore_tower(tower_path, node_pubkey).map(|tower| tower.root()) } -fn remove_tower(ledger_path: &Path, node_pubkey: &Pubkey) { - fs::remove_file(Tower::get_filename(ledger_path, node_pubkey)).unwrap(); +fn remove_tower(tower_path: &Path, node_pubkey: &Pubkey) { + let file_tower_storage = FileTowerStorage::new(tower_path.to_path_buf()); + fs::remove_file(file_tower_storage.filename(node_pubkey)).unwrap(); } // A bit convoluted test case; but this roughly follows this test theoretical scenario: diff --git a/validator/src/admin_rpc_service.rs b/validator/src/admin_rpc_service.rs index 9dd3a5f937..01c1aae017 100644 --- a/validator/src/admin_rpc_service.rs +++ b/validator/src/admin_rpc_service.rs @@ -5,7 +5,10 @@ use { jsonrpc_ipc_server::{RequestContext, ServerBuilder}, jsonrpc_server_utils::tokio, log::*, - solana_core::validator::ValidatorStartProgress, + solana_core::{ + consensus::{Tower, TowerStorage}, + validator::ValidatorStartProgress, + }, solana_gossip::cluster_info::ClusterInfo, solana_sdk::{ exit::Exit, @@ -13,7 +16,7 @@ use { }, std::{ net::SocketAddr, - path::{Path, PathBuf}, + path::Path, sync::{Arc, RwLock}, thread::{self, Builder}, time::{Duration, SystemTime}, @@ -28,7 +31,7 @@ pub struct AdminRpcRequestMetadata { pub validator_exit: Arc>, pub authorized_voter_keypairs: Arc>>>, pub cluster_info: Arc>>>, - pub tower_path: PathBuf, + pub tower_storage: Arc, } impl Metadata for AdminRpcRequestMetadata {} @@ -147,13 +150,12 @@ impl AdminRpc for AdminRpcImpl { // Ensure a Tower exists for the new identity and exit gracefully. // ReplayStage will be less forgiving if it fails to load the new tower. - solana_core::consensus::Tower::restore(&meta.tower_path, &identity_keypair.pubkey()) - .map_err(|err| { - jsonrpc_core::error::Error::invalid_params(format!( - "Unable to load tower file for new identity: {}", - err - )) - })?; + Tower::restore(meta.tower_storage.as_ref(), &identity_keypair.pubkey()).map_err(|err| { + jsonrpc_core::error::Error::invalid_params(format!( + "Unable to load tower file for new identity: {}", + err + )) + })?; if let Some(cluster_info) = meta.cluster_info.read().unwrap().as_ref() { solana_metrics::set_host_id(identity_keypair.pubkey().to_string()); diff --git a/validator/src/bin/solana-test-validator.rs b/validator/src/bin/solana-test-validator.rs index ea9a0a331a..990658708e 100644 --- a/validator/src/bin/solana-test-validator.rs +++ b/validator/src/bin/solana-test-validator.rs @@ -8,6 +8,7 @@ use { }, }, solana_client::rpc_client::RpcClient, + solana_core::consensus::FileTowerStorage, solana_faucet::faucet::{run_local_faucet_with_port, FAUCET_PORT}, solana_rpc::rpc::JsonRpcConfig, solana_sdk::{ @@ -521,7 +522,7 @@ fn main() { validator_exit: genesis.validator_exit.clone(), authorized_voter_keypairs: genesis.authorized_voter_keypairs.clone(), cluster_info: admin_service_cluster_info.clone(), - tower_path: ledger_path.clone(), + tower_storage: Arc::new(FileTowerStorage::new(ledger_path.clone())), }, ); let dashboard = if output == Output::Dashboard { diff --git a/validator/src/main.rs b/validator/src/main.rs index 823f03acc7..719404ef86 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -20,6 +20,7 @@ use { rpc_request::MAX_MULTIPLE_ACCOUNTS, }, solana_core::{ + consensus::FileTowerStorage, ledger_cleanup_service::{DEFAULT_MAX_LEDGER_SHREDS, DEFAULT_MIN_MAX_LEDGER_SHREDS}, tpu::DEFAULT_TPU_COALESCE_MS, validator::{ @@ -2286,11 +2287,13 @@ pub fn main() { .ok() .or_else(|| get_cluster_shred_version(&entrypoint_addrs)); + let tower_path = value_t!(matches, "tower", PathBuf) + .ok() + .unwrap_or_else(|| ledger_path.clone()); + let mut validator_config = ValidatorConfig { require_tower: matches.is_present("require_tower"), - tower_path: value_t!(matches, "tower", PathBuf) - .ok() - .or_else(|| Some(ledger_path.clone())), + tower_storage: Arc::new(FileTowerStorage::new(tower_path)), dev_halt_at_slot: value_t!(matches, "dev_halt_at_slot", Slot).ok(), expected_genesis_hash: matches .value_of("expected_genesis_hash") @@ -2577,7 +2580,7 @@ pub fn main() { start_progress: start_progress.clone(), authorized_voter_keypairs: authorized_voter_keypairs.clone(), cluster_info: admin_service_cluster_info.clone(), - tower_path: validator_config.tower_path.clone().unwrap(), + tower_storage: validator_config.tower_storage.clone(), }, );