From 9a80e31bae2f9284e635b6d658d26b6ad766b298 Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Tue, 14 Jul 2020 21:14:48 -0600 Subject: [PATCH] Expose tss to the other blockstore_processor path (#11070) --- core/src/validator.rs | 107 ++++++++++++++++++----------- ledger-tool/src/main.rs | 1 + ledger/src/bank_forks_utils.rs | 3 + ledger/src/blockstore_processor.rs | 37 +++++++--- 4 files changed, 99 insertions(+), 49 deletions(-) diff --git a/core/src/validator.rs b/core/src/validator.rs index 419b2b15c5..23b240ffe4 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -8,7 +8,7 @@ use crate::{ gossip_service::{discover_cluster, GossipService}, poh_recorder::{PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS}, poh_service::PohService, - rewards_recorder_service::RewardsRecorderService, + rewards_recorder_service::{RewardsRecorderSender, RewardsRecorderService}, rpc::JsonRpcConfig, rpc_pubsub_service::PubSubService, rpc_service::JsonRpcService, @@ -27,7 +27,8 @@ use solana_ledger::{ bank_forks_utils, blockstore::{Blockstore, CompletedSlotsReceiver, PurgeType}, blockstore_db::BlockstoreRecoveryMode, - blockstore_processor, create_new_tmp_ledger, + blockstore_processor::{self, TransactionStatusSender}, + create_new_tmp_ledger, leader_schedule::FixedSchedule, leader_schedule_cache::LeaderScheduleCache, }; @@ -135,6 +136,14 @@ impl ValidatorExit { } } +#[derive(Default)] +struct TransactionHistoryServices { + transaction_status_sender: Option, + transaction_status_service: Option, + rewards_recorder_sender: Option, + rewards_recorder_service: Option, +} + pub struct Validator { pub id: Pubkey, validator_exit: Arc>>, @@ -203,6 +212,12 @@ impl Validator { cleanup_accounts_path(accounts_path); } + let mut validator_exit = ValidatorExit::default(); + let exit = Arc::new(AtomicBool::new(false)); + let exit_ = exit.clone(); + validator_exit.register_exit(Box::new(move || exit_.store(true, Ordering::Relaxed))); + let validator_exit = Arc::new(RwLock::new(Some(validator_exit))); + let ( genesis_config, bank_forks, @@ -211,7 +226,13 @@ impl Validator { completed_slots_receiver, leader_schedule_cache, snapshot_hash, - ) = new_banks_from_ledger(config, ledger_path, poh_verify); + TransactionHistoryServices { + transaction_status_sender, + transaction_status_service, + rewards_recorder_sender, + rewards_recorder_service, + }, + ) = new_banks_from_ledger(config, ledger_path, poh_verify, &exit); let leader_schedule_cache = Arc::new(leader_schedule_cache); let bank = bank_forks.working_bank(); @@ -243,14 +264,7 @@ impl Validator { } } - let mut validator_exit = ValidatorExit::default(); - let exit = Arc::new(AtomicBool::new(false)); - let exit_ = exit.clone(); - validator_exit.register_exit(Box::new(move || exit_.store(true, Ordering::Relaxed))); - let validator_exit = Arc::new(RwLock::new(Some(validator_exit))); - let cluster_info = Arc::new(ClusterInfo::new(node.info.clone(), keypair.clone())); - let blockstore = Arc::new(blockstore); let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default())); let subscriptions = Arc::new(RpcSubscriptions::new( @@ -291,36 +305,6 @@ impl Validator { ) }); - let (transaction_status_sender, transaction_status_service) = - if rpc_service.is_some() && config.rpc_config.enable_rpc_transaction_history { - let (transaction_status_sender, transaction_status_receiver) = unbounded(); - ( - Some(transaction_status_sender), - Some(TransactionStatusService::new( - transaction_status_receiver, - blockstore.clone(), - &exit, - )), - ) - } else { - (None, None) - }; - - let (rewards_recorder_sender, rewards_recorder_service) = - if rpc_service.is_some() && config.rpc_config.enable_rpc_transaction_history { - let (rewards_recorder_sender, rewards_receiver) = unbounded(); - ( - Some(rewards_recorder_sender), - Some(RewardsRecorderService::new( - rewards_receiver, - blockstore.clone(), - &exit, - )), - ) - } else { - (None, None) - }; - info!( "Starting PoH: epoch={} slot={} tick_height={} blockhash={} leader={:?}", bank.epoch(), @@ -579,14 +563,16 @@ fn new_banks_from_ledger( config: &ValidatorConfig, ledger_path: &Path, poh_verify: bool, + exit: &Arc, ) -> ( GenesisConfig, BankForks, - Blockstore, + Arc, Receiver, CompletedSlotsReceiver, LeaderScheduleCache, Option<(Slot, Hash)>, + TransactionHistoryServices, ) { info!("loading ledger from {:?}...", ledger_path); let genesis_config = open_genesis_config(ledger_path, config.max_genesis_archive_unpacked_size); @@ -622,12 +608,23 @@ fn new_banks_from_ledger( ..blockstore_processor::ProcessOptions::default() }; + let blockstore = Arc::new(blockstore); + let transaction_history_services = + if config.rpc_ports.is_some() && config.rpc_config.enable_rpc_transaction_history { + initialize_rpc_transaction_history_services(blockstore.clone(), exit) + } else { + TransactionHistoryServices::default() + }; + let (mut bank_forks, mut leader_schedule_cache, snapshot_hash) = bank_forks_utils::load( &genesis_config, &blockstore, config.account_paths.clone(), config.snapshot_config.as_ref(), process_options, + transaction_history_services + .transaction_status_sender + .clone(), ) .unwrap_or_else(|err| { error!("Failed to load ledger: {:?}", err); @@ -647,6 +644,7 @@ fn new_banks_from_ledger( completed_slots_receiver, leader_schedule_cache, snapshot_hash, + transaction_history_services, ) } @@ -710,6 +708,33 @@ fn backup_and_clear_blockstore(ledger_path: &Path, start_slot: Slot, shred_versi drop(blockstore); } +fn initialize_rpc_transaction_history_services( + blockstore: Arc, + exit: &Arc, +) -> TransactionHistoryServices { + let (transaction_status_sender, transaction_status_receiver) = unbounded(); + let transaction_status_sender = Some(transaction_status_sender); + let transaction_status_service = Some(TransactionStatusService::new( + transaction_status_receiver, + blockstore.clone(), + exit, + )); + + let (rewards_recorder_sender, rewards_receiver) = unbounded(); + let rewards_recorder_sender = Some(rewards_recorder_sender); + let rewards_recorder_service = Some(RewardsRecorderService::new( + rewards_receiver, + blockstore, + exit, + )); + TransactionHistoryServices { + transaction_status_sender, + transaction_status_service, + rewards_recorder_sender, + rewards_recorder_service, + } +} + // Return true on error, indicating the validator should exit. fn wait_for_supermajority( config: &ValidatorConfig, diff --git a/ledger-tool/src/main.rs b/ledger-tool/src/main.rs index 36afe44c5d..bedb0620c4 100644 --- a/ledger-tool/src/main.rs +++ b/ledger-tool/src/main.rs @@ -680,6 +680,7 @@ fn load_bank_forks( account_paths, snapshot_config.as_ref(), process_options, + None, ) } diff --git a/ledger/src/bank_forks_utils.rs b/ledger/src/bank_forks_utils.rs index 9d6cea4fe1..4002b7898d 100644 --- a/ledger/src/bank_forks_utils.rs +++ b/ledger/src/bank_forks_utils.rs @@ -2,6 +2,7 @@ use crate::{ blockstore::Blockstore, blockstore_processor::{ self, BlockstoreProcessorError, BlockstoreProcessorResult, ProcessOptions, + TransactionStatusSender, }, entry::VerifyRecyclers, leader_schedule_cache::LeaderScheduleCache, @@ -34,6 +35,7 @@ pub fn load( account_paths: Vec, snapshot_config: Option<&SnapshotConfig>, process_options: ProcessOptions, + transaction_status_sender: Option, ) -> LoadResult { if let Some(snapshot_config) = snapshot_config.as_ref() { info!( @@ -86,6 +88,7 @@ pub fn load( Arc::new(deserialized_bank), &process_options, &VerifyRecyclers::default(), + transaction_status_sender, ), Some(deserialized_snapshot_hash), ); diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 8ea7634776..d1fe54a03e 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -294,7 +294,7 @@ pub fn process_blockstore( info!("processing ledger for slot 0..."); let recyclers = VerifyRecyclers::default(); process_bank_0(&bank0, blockstore, &opts, &recyclers)?; - process_blockstore_from_root(genesis_config, blockstore, bank0, &opts, &recyclers) + process_blockstore_from_root(genesis_config, blockstore, bank0, &opts, &recyclers, None) } // Process blockstore from a known root bank @@ -304,6 +304,7 @@ pub fn process_blockstore_from_root( bank: Arc, opts: &ProcessOptions, recyclers: &VerifyRecyclers, + transaction_status_sender: Option, ) -> BlockstoreProcessorResult { info!("processing ledger from slot {}...", bank.slot()); let allocated = thread_mem_usage::Allocatedp::default(); @@ -368,6 +369,7 @@ pub fn process_blockstore_from_root( &mut root, opts, recyclers, + transaction_status_sender, )?; (initial_forks, leader_schedule_cache) } else { @@ -456,6 +458,7 @@ fn confirm_full_slot( opts: &ProcessOptions, recyclers: &VerifyRecyclers, progress: &mut ConfirmationProgress, + transaction_status_sender: Option, ) -> result::Result<(), BlockstoreProcessorError> { let mut timing = ConfirmationTiming::default(); let skip_verification = !opts.poh_verify; @@ -465,7 +468,7 @@ fn confirm_full_slot( &mut timing, progress, skip_verification, - None, + transaction_status_sender, opts.entry_callback.as_ref(), recyclers, )?; @@ -629,7 +632,7 @@ fn process_bank_0( ) -> result::Result<(), BlockstoreProcessorError> { assert_eq!(bank0.slot(), 0); let mut progress = ConfirmationProgress::new(bank0.last_blockhash()); - confirm_full_slot(blockstore, bank0, opts, recyclers, &mut progress) + confirm_full_slot(blockstore, bank0, opts, recyclers, &mut progress, None) .expect("processing for bank 0 must succeed"); bank0.freeze(); Ok(()) @@ -702,6 +705,7 @@ fn load_frozen_forks( root: &mut Slot, opts: &ProcessOptions, recyclers: &VerifyRecyclers, + transaction_status_sender: Option, ) -> result::Result>, BlockstoreProcessorError> { let mut initial_forks = HashMap::new(); let mut last_status_report = Instant::now(); @@ -741,7 +745,16 @@ fn load_frozen_forks( let initial_allocation = allocated.get(); let mut progress = ConfirmationProgress::new(last_entry_hash); - if process_single_slot(blockstore, &bank, opts, recyclers, &mut progress).is_err() { + if process_single_slot( + blockstore, + &bank, + opts, + recyclers, + &mut progress, + transaction_status_sender.clone(), + ) + .is_err() + { continue; } txs += progress.num_txs; @@ -788,10 +801,11 @@ fn process_single_slot( opts: &ProcessOptions, recyclers: &VerifyRecyclers, progress: &mut ConfirmationProgress, + transaction_status_sender: Option, ) -> result::Result<(), BlockstoreProcessorError> { // Mark corrupt slots as dead so validators don't replay this slot and // see DuplicateSignature errors later in ReplayStage - confirm_full_slot(blockstore, bank, opts, recyclers, progress).map_err(|err| { + confirm_full_slot(blockstore, bank, opts, recyclers, progress, transaction_status_sender).map_err(|err| { let slot = bank.slot(); warn!("slot {} failed to verify: {}", slot, err); if blockstore.is_primary_access() { @@ -2418,14 +2432,21 @@ pub mod tests { &opts, &recyclers, &mut ConfirmationProgress::new(bank0.last_blockhash()), + None, ) .unwrap(); bank1.squash(); // Test process_blockstore_from_root() from slot 1 onwards - let (bank_forks, _leader_schedule) = - process_blockstore_from_root(&genesis_config, &blockstore, bank1, &opts, &recyclers) - .unwrap(); + let (bank_forks, _leader_schedule) = process_blockstore_from_root( + &genesis_config, + &blockstore, + bank1, + &opts, + &recyclers, + None, + ) + .unwrap(); assert_eq!(frozen_bank_slots(&bank_forks), vec![5, 6]); assert_eq!(bank_forks.working_bank().slot(), 6);