diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index ce9cf66015..46cf76f8b4 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -25,7 +25,7 @@ use solana_sdk::{ clock::Slot, hash::Hash, pubkey::Pubkey, - signature::KeypairUtil, + signature::{Keypair, KeypairUtil}, timing::{self, duration_as_ms}, transaction::Transaction, }; @@ -64,6 +64,24 @@ impl Drop for Finalizer { } } +pub struct ReplayStageConfig { + pub my_pubkey: Pubkey, + pub vote_account: Pubkey, + pub voting_keypair: Option>, + pub blocktree: Arc, + pub bank_forks: Arc>, + pub cluster_info: Arc>, + pub exit: Arc, + pub ledger_signal_receiver: Receiver, + pub subscriptions: Arc, + pub poh_recorder: Arc>, + pub leader_schedule_cache: Arc, + pub slot_full_senders: Vec>, + pub snapshot_package_sender: Option, + pub block_commitment_cache: Arc>, + pub transaction_status_sender: Option, +} + pub struct ReplayStage { t_replay: JoinHandle>, commitment_service: AggregateCommitmentService, @@ -162,51 +180,39 @@ impl ForkProgress { } impl ReplayStage { - #[allow( - clippy::new_ret_no_self, - clippy::too_many_arguments, - clippy::type_complexity - )] - pub fn new( - my_pubkey: &Pubkey, - vote_account: &Pubkey, - voting_keypair: Option<&Arc>, - blocktree: Arc, - bank_forks: &Arc>, - cluster_info: Arc>, - exit: &Arc, - ledger_signal_receiver: Receiver, - subscriptions: &Arc, - poh_recorder: &Arc>, - leader_schedule_cache: &Arc, - slot_full_senders: Vec>, - snapshot_package_sender: Option, - block_commitment_cache: Arc>, - transaction_status_sender: Option, - ) -> (Self, Receiver>>) - where - T: 'static + KeypairUtil + Send + Sync, - { + #[allow(clippy::new_ret_no_self)] + pub fn new(config: ReplayStageConfig) -> (Self, Receiver>>) { + let ReplayStageConfig { + my_pubkey, + vote_account, + voting_keypair, + blocktree, + bank_forks, + cluster_info, + exit, + ledger_signal_receiver, + subscriptions, + poh_recorder, + leader_schedule_cache, + slot_full_senders, + snapshot_package_sender, + block_commitment_cache, + transaction_status_sender, + } = config; + let (root_bank_sender, root_bank_receiver) = channel(); trace!("replay stage"); - let exit_ = exit.clone(); - let subscriptions = subscriptions.clone(); - let bank_forks = bank_forks.clone(); - let poh_recorder = poh_recorder.clone(); - let my_pubkey = *my_pubkey; let mut tower = Tower::new(&my_pubkey, &vote_account, &bank_forks.read().unwrap()); + // Start the replay stage loop - let leader_schedule_cache = leader_schedule_cache.clone(); - let vote_account = *vote_account; - let voting_keypair = voting_keypair.cloned(); let (lockouts_sender, commitment_service) = - AggregateCommitmentService::new(exit, block_commitment_cache); + AggregateCommitmentService::new(&exit, block_commitment_cache); let t_replay = Builder::new() .name("solana-replay-stage".to_string()) .spawn(move || { - let _exit = Finalizer::new(exit_.clone()); + let _exit = Finalizer::new(exit.clone()); let mut progress = HashMap::new(); // Initialize progress map with any root banks for bank in bank_forks.read().unwrap().frozen_banks().values() { @@ -224,7 +230,7 @@ impl ReplayStage { thread_mem_usage::datapoint("solana-replay-stage"); let now = Instant::now(); // Stop getting entries if we get exit signal - if exit_.load(Ordering::Relaxed) { + if exit.load(Ordering::Relaxed) { break; } @@ -593,13 +599,13 @@ impl ReplayStage { } #[allow(clippy::too_many_arguments)] - fn handle_votable_bank( + fn handle_votable_bank( bank: &Arc, bank_forks: &Arc>, tower: &mut Tower, progress: &mut HashMap, vote_account: &Pubkey, - voting_keypair: &Option>, + voting_keypair: &Option>, cluster_info: &Arc>, blocktree: &Arc, leader_schedule_cache: &Arc, @@ -607,10 +613,7 @@ impl ReplayStage { total_staked: u64, lockouts_sender: &Sender, snapshot_package_sender: &Option, - ) -> Result<()> - where - T: 'static + KeypairUtil + Send + Sync, - { + ) -> Result<()> { if bank.is_empty() { inc_new_counter_info!("replay_stage-voted_empty_bank", 1); } diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 456b12178c..e1937d136d 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -8,7 +8,7 @@ use crate::{ ledger_cleanup_service::LedgerCleanupService, partition_cfg::PartitionCfg, poh_recorder::PohRecorder, - replay_stage::ReplayStage, + replay_stage::{ReplayStage, ReplayStageConfig}, retransmit_stage::RetransmitStage, rpc_subscriptions::RpcSubscriptions, shred_fetch_stage::ShredFetchStage, @@ -65,9 +65,9 @@ impl Tvu { /// * `sockets` - fetch, repair, and retransmit sockets /// * `blocktree` - the ledger itself #[allow(clippy::new_ret_no_self, clippy::too_many_arguments)] - pub fn new( + pub fn new( vote_account: &Pubkey, - voting_keypair: Option<&Arc>, + voting_keypair: Option>, storage_keypair: &Arc, bank_forks: &Arc>, cluster_info: &Arc>, @@ -87,10 +87,7 @@ impl Tvu { cfg: Option, shred_version: u16, transaction_status_sender: Option, - ) -> Self - where - T: 'static + KeypairUtil + Sync + Send, - { + ) -> Self { let keypair: Arc = cluster_info .read() .expect("Unable to read from cluster_info during Tvu creation") @@ -162,23 +159,25 @@ impl Tvu { } }; - let (replay_stage, root_bank_receiver) = ReplayStage::new( - &keypair.pubkey(), - vote_account, + let replay_stage_config = ReplayStageConfig { + my_pubkey: keypair.pubkey(), + vote_account: *vote_account, voting_keypair, - blocktree.clone(), - &bank_forks, - cluster_info.clone(), - &exit, + blocktree: blocktree.clone(), + bank_forks: bank_forks.clone(), + cluster_info: cluster_info.clone(), + exit: exit.clone(), ledger_signal_receiver, - subscriptions, - poh_recorder, - leader_schedule_cache, - vec![blockstream_slot_sender, ledger_cleanup_slot_sender], + subscriptions: subscriptions.clone(), + poh_recorder: poh_recorder.clone(), + leader_schedule_cache: leader_schedule_cache.clone(), + slot_full_senders: vec![blockstream_slot_sender, ledger_cleanup_slot_sender], snapshot_package_sender, block_commitment_cache, transaction_status_sender, - ); + }; + + let (replay_stage, root_bank_receiver) = ReplayStage::new(replay_stage_config); let blockstream_service = if let Some(blockstream_unix_socket) = blockstream_unix_socket { let blockstream_service = BlockstreamService::new( @@ -284,7 +283,7 @@ pub mod tests { let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default())); let tvu = Tvu::new( &voting_keypair.pubkey(), - Some(&Arc::new(voting_keypair)), + Some(Arc::new(voting_keypair)), &storage_keypair, &Arc::new(RwLock::new(bank_forks)), &cref1, diff --git a/core/src/validator.rs b/core/src/validator.rs index 7eda71f3c5..ff864e5070 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -350,7 +350,7 @@ impl Validator { let voting_keypair = if config.voting_disabled { None } else { - Some(voting_keypair) + Some(voting_keypair.clone()) }; let tvu = Tvu::new(