From fa04531c7a2175a1e61d7af3bf42117c5e33cd63 Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Wed, 16 Jun 2021 10:57:52 -0700 Subject: [PATCH] Extricate RpcCompletedSlotsService from RetransmitStage --- core/src/replay_stage.rs | 51 ++++++++++++++------------ core/src/retransmit_stage.rs | 25 +++---------- core/src/tvu.rs | 13 ++----- core/src/validator.rs | 26 +++++++++---- rpc/src/rpc_completed_slots_service.rs | 31 +++++++--------- 5 files changed, 68 insertions(+), 78 deletions(-) diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index cb8833cc17..158505ffa6 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -118,7 +118,7 @@ pub struct ReplayStageConfig { pub vote_account: Pubkey, pub authorized_voter_keypairs: Arc>>>, pub exit: Arc, - pub subscriptions: Arc, + pub rpc_subscriptions: Arc, pub leader_schedule_cache: Arc, pub latest_root_senders: Vec>, pub accounts_background_request_sender: AbsRequestSender, @@ -302,7 +302,7 @@ impl ReplayStage { vote_account, authorized_voter_keypairs, exit, - subscriptions, + rpc_subscriptions, leader_schedule_cache, latest_root_senders, accounts_background_request_sender, @@ -319,7 +319,7 @@ impl ReplayStage { let (lockouts_sender, commitment_service) = AggregateCommitmentService::new( &exit, block_commitment_cache.clone(), - subscriptions.clone(), + rpc_subscriptions.clone(), ); #[allow(clippy::cognitive_complexity)] @@ -363,7 +363,7 @@ impl ReplayStage { &blockstore, &bank_forks, &leader_schedule_cache, - &subscriptions, + &rpc_subscriptions, &mut progress, ); generate_new_bank_forks_time.stop(); @@ -386,7 +386,7 @@ impl ReplayStage { &replay_vote_sender, &bank_notification_sender, &rewards_recorder_sender, - &subscriptions, + &rpc_subscriptions, &mut duplicate_slots_tracker, &gossip_duplicate_confirmed_slots, &mut unfrozen_gossip_verified_vote_hashes, @@ -577,7 +577,7 @@ impl ReplayStage { &lockouts_sender, &accounts_background_request_sender, &latest_root_senders, - &subscriptions, + &rpc_subscriptions, &block_commitment_cache, &mut heaviest_subtree_fork_choice, &bank_notification_sender, @@ -667,7 +667,7 @@ impl ReplayStage { &bank_forks, &poh_recorder, &leader_schedule_cache, - &subscriptions, + &rpc_subscriptions, &progress, &retransmit_slots_sender, &mut skipped_slots_info, @@ -1069,7 +1069,7 @@ impl ReplayStage { bank_forks: &Arc>, poh_recorder: &Arc>, leader_schedule_cache: &Arc, - subscriptions: &Arc, + rpc_subscriptions: &Arc, progress_map: &ProgressMap, retransmit_slots_sender: &RetransmitSlotsSender, skipped_slots_info: &mut SkippedSlotsInfo, @@ -1181,7 +1181,7 @@ impl ReplayStage { poh_slot, root_slot, my_pubkey, - subscriptions, + rpc_subscriptions, ); let tpu_bank = bank_forks.write().unwrap().insert(tpu_bank); @@ -1230,7 +1230,7 @@ impl ReplayStage { bank: &Bank, root: Slot, err: &BlockstoreProcessorError, - subscriptions: &Arc, + rpc_subscriptions: &Arc, duplicate_slots_tracker: &mut DuplicateSlotsTracker, gossip_duplicate_confirmed_slots: &GossipDuplicateConfirmedSlots, progress: &mut ProgressMap, @@ -1264,7 +1264,7 @@ impl ReplayStage { blockstore .set_dead_slot(slot) .expect("Failed to mark slot as dead in blockstore"); - subscriptions.notify_slot_update(SlotUpdate::Dead { + rpc_subscriptions.notify_slot_update(SlotUpdate::Dead { slot, err: format!("error: {:?}", err), timestamp: timestamp(), @@ -1297,7 +1297,7 @@ impl ReplayStage { lockouts_sender: &Sender, accounts_background_request_sender: &AbsRequestSender, latest_root_senders: &[Sender], - subscriptions: &Arc, + rpc_subscriptions: &Arc, block_commitment_cache: &Arc>, heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice, bank_notification_sender: &Option, @@ -1356,7 +1356,7 @@ impl ReplayStage { has_new_vote_been_rooted, vote_signatures, ); - subscriptions.notify_roots(rooted_slots); + rpc_subscriptions.notify_roots(rooted_slots); if let Some(sender) = bank_notification_sender { sender .send(BankNotification::Root(root_bank)) @@ -1643,7 +1643,7 @@ impl ReplayStage { replay_vote_sender: &ReplayVoteSender, bank_notification_sender: &Option, rewards_recorder_sender: &Option, - subscriptions: &Arc, + rpc_subscriptions: &Arc, duplicate_slots_tracker: &mut DuplicateSlotsTracker, gossip_duplicate_confirmed_slots: &GossipDuplicateConfirmedSlots, unfrozen_gossip_verified_vote_hashes: &mut UnfrozenGossipVerifiedVoteHashes, @@ -1714,7 +1714,7 @@ impl ReplayStage { &bank, root_slot, &err, - subscriptions, + rpc_subscriptions, duplicate_slots_tracker, gossip_duplicate_confirmed_slots, progress, @@ -2408,7 +2408,7 @@ impl ReplayStage { blockstore: &Blockstore, bank_forks: &RwLock, leader_schedule_cache: &Arc, - subscriptions: &Arc, + rpc_subscriptions: &Arc, progress: &mut ProgressMap, ) { // Find the next slot that chains to the old slot @@ -2453,7 +2453,7 @@ impl ReplayStage { child_slot, forks.root(), &leader, - subscriptions, + rpc_subscriptions, ); let empty: Vec = vec![]; Self::update_fork_propagated_threshold_from_votes( @@ -2479,9 +2479,9 @@ impl ReplayStage { slot: u64, root_slot: u64, leader: &Pubkey, - subscriptions: &Arc, + rpc_subscriptions: &Arc, ) -> Bank { - subscriptions.notify_slot(slot, parent.slot(), root_slot); + rpc_subscriptions.notify_slot(slot, parent.slot(), root_slot); Bank::new_from_parent(parent, leader, slot) } @@ -3191,7 +3191,7 @@ mod tests { &&VerifyRecyclers::default(), ); - let subscriptions = Arc::new(RpcSubscriptions::new( + let rpc_subscriptions = Arc::new(RpcSubscriptions::new( &exit, bank_forks.clone(), block_commitment_cache, @@ -3203,7 +3203,7 @@ mod tests { &bank0, 0, err, - &subscriptions, + &rpc_subscriptions, &mut DuplicateSlotsTracker::default(), &GossipDuplicateConfirmedSlots::default(), &mut progress, @@ -3254,14 +3254,17 @@ mod tests { let exit = Arc::new(AtomicBool::new(false)); let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default())); - let subscriptions = Arc::new(RpcSubscriptions::new( + let rpc_subscriptions = Arc::new(RpcSubscriptions::new( &exit, bank_forks.clone(), block_commitment_cache.clone(), OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), )); - let (lockouts_sender, _) = - AggregateCommitmentService::new(&exit, block_commitment_cache.clone(), subscriptions); + let (lockouts_sender, _) = AggregateCommitmentService::new( + &exit, + block_commitment_cache.clone(), + rpc_subscriptions, + ); assert!(block_commitment_cache .read() diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 499a8f28ae..b0f293f785 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -18,17 +18,11 @@ use solana_gossip::{ contact_info::ContactInfo, }; use solana_ledger::shred::{get_shred_slot_index_type, ShredFetchStats}; -use solana_ledger::{ - blockstore::{Blockstore, CompletedSlotsReceiver}, - leader_schedule_cache::LeaderScheduleCache, -}; +use solana_ledger::{blockstore::Blockstore, leader_schedule_cache::LeaderScheduleCache}; use solana_measure::measure::Measure; use solana_metrics::inc_new_counter_error; use solana_perf::packet::{Packet, Packets}; -use solana_rpc::{ - max_slots::MaxSlots, rpc_completed_slots_service::RpcCompletedSlotsService, - rpc_subscriptions::RpcSubscriptions, -}; +use solana_rpc::{max_slots::MaxSlots, rpc_subscriptions::RpcSubscriptions}; use solana_runtime::{bank::Bank, bank_forks::BankForks}; use solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey, timing::timestamp}; use solana_streamer::streamer::PacketReceiver; @@ -602,7 +596,6 @@ impl RetransmitStage { repair_socket: Arc, verified_receiver: Receiver>, exit: &Arc, - rpc_completed_slots_receiver: CompletedSlotsReceiver, cluster_slots_update_receiver: ClusterSlotsUpdateReceiver, epoch_schedule: EpochSchedule, cfg: Option>, @@ -619,18 +612,16 @@ impl RetransmitStage { let (retransmit_sender, retransmit_receiver) = channel(); let retransmit_receiver = Arc::new(Mutex::new(retransmit_receiver)); - let t_retransmit = retransmitter( + let thread_hdls = retransmitter( retransmit_sockets, bank_forks.clone(), leader_schedule_cache, cluster_info.clone(), retransmit_receiver, max_slots, - rpc_subscriptions.clone(), + rpc_subscriptions, ); - let rpc_completed_slots_hdl = - RpcCompletedSlotsService::spawn(rpc_completed_slots_receiver, rpc_subscriptions); let cluster_slots_service = ClusterSlotsService::new( blockstore.clone(), cluster_slots.clone(), @@ -677,11 +668,6 @@ impl RetransmitStage { duplicate_slots_sender, ); - let mut thread_hdls = t_retransmit; - if let Some(thread_hdl) = rpc_completed_slots_hdl { - thread_hdls.push(thread_hdl); - } - Self { thread_hdls, window_service, @@ -751,7 +737,7 @@ mod tests { let cluster_info = Arc::new(cluster_info); let (retransmit_sender, retransmit_receiver) = channel(); - let t_retransmit = retransmitter( + let _t_retransmit = retransmitter( retransmit_socket, bank_forks, &leader_schedule_cache, @@ -760,7 +746,6 @@ mod tests { &Arc::new(MaxSlots::default()), None, ); - let _thread_hdls = vec![t_retransmit]; let mut shred = Shred::new_from_data(0, 0, 0, None, true, true, 0, 0x20, 0); let mut packet = Packet::default(); diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 391240b9c8..7467c3ba22 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -25,8 +25,7 @@ use crate::{ use crossbeam_channel::unbounded; use solana_gossip::cluster_info::ClusterInfo; use solana_ledger::{ - blockstore::{Blockstore, CompletedSlotsReceiver}, - blockstore_processor::TransactionStatusSender, + blockstore::Blockstore, blockstore_processor::TransactionStatusSender, leader_schedule_cache::LeaderScheduleCache, }; use solana_poh::poh_recorder::PohRecorder; @@ -109,12 +108,11 @@ impl Tvu { sockets: Sockets, blockstore: Arc, ledger_signal_receiver: Receiver, - subscriptions: &Arc, + rpc_subscriptions: &Arc, poh_recorder: &Arc>, tower: Tower, leader_schedule_cache: &Arc, exit: &Arc, - completed_slots_receiver: CompletedSlotsReceiver, block_commitment_cache: Arc>, cfg: Option>, transaction_status_sender: Option, @@ -179,7 +177,6 @@ impl Tvu { repair_socket, verified_receiver, &exit, - completed_slots_receiver, cluster_slots_update_receiver, *bank_forks.read().unwrap().working_bank().epoch_schedule(), cfg, @@ -190,7 +187,7 @@ impl Tvu { tvu_config.repair_validators, completed_data_sets_sender, max_slots, - Some(subscriptions.clone()), + Some(rpc_subscriptions.clone()), duplicate_slots_sender, ); @@ -266,7 +263,7 @@ impl Tvu { vote_account: *vote_account, authorized_voter_keypairs, exit: exit.clone(), - subscriptions: subscriptions.clone(), + rpc_subscriptions: rpc_subscriptions.clone(), leader_schedule_cache: leader_schedule_cache.clone(), latest_root_senders: vec![ledger_cleanup_slot_sender], accounts_background_request_sender, @@ -381,7 +378,6 @@ pub mod tests { let BlockstoreSignals { blockstore, ledger_signal_receiver, - completed_slots_receiver, .. } = Blockstore::open_with_signal(&blockstore_path, None, true) .expect("Expected to successfully open ledger"); @@ -425,7 +421,6 @@ pub mod tests { tower, &leader_schedule_cache, &exit, - completed_slots_receiver, block_commitment_cache, None, None, diff --git a/core/src/validator.rs b/core/src/validator.rs index 5dd604e619..ada7c9200e 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -47,6 +47,7 @@ use solana_rpc::{ OptimisticallyConfirmedBank, OptimisticallyConfirmedBankTracker, }, rpc::JsonRpcConfig, + rpc_completed_slots_service::RpcCompletedSlotsService, rpc_pubsub_service::{PubSubConfig, PubSubService}, rpc_service::JsonRpcService, rpc_subscriptions::RpcSubscriptions, @@ -80,7 +81,7 @@ use std::{ sync::atomic::{AtomicBool, AtomicU64, Ordering}, sync::mpsc::Receiver, sync::{Arc, Mutex, RwLock}, - thread::{sleep, Builder}, + thread::{sleep, Builder, JoinHandle}, time::{Duration, Instant}, }; @@ -243,6 +244,7 @@ pub struct Validator { validator_exit: Arc>, json_rpc_service: Option, pubsub_service: Option, + rpc_completed_slots_service: JoinHandle<()>, optimistically_confirmed_bank_tracker: Option, transaction_status_service: Option, rewards_recorder_service: Option, @@ -446,7 +448,7 @@ impl Validator { let optimistically_confirmed_bank = OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); - let subscriptions = Arc::new(RpcSubscriptions::new_with_vote_subscription( + let rpc_subscriptions = Arc::new(RpcSubscriptions::new_with_vote_subscription( &exit, bank_forks.clone(), block_commitment_cache.clone(), @@ -460,7 +462,7 @@ impl Validator { let completed_data_sets_service = CompletedDataSetsService::new( completed_data_sets_receiver, blockstore.clone(), - subscriptions.clone(), + rpc_subscriptions.clone(), &exit, max_slots.clone(), ); @@ -540,7 +542,7 @@ impl Validator { } else { Some(PubSubService::new( config.pubsub_config.clone(), - &subscriptions, + &rpc_subscriptions, rpc_pubsub_addr, &exit, )) @@ -550,7 +552,7 @@ impl Validator { &exit, bank_forks.clone(), optimistically_confirmed_bank, - subscriptions.clone(), + rpc_subscriptions.clone(), )), Some(bank_notification_sender), ) @@ -660,6 +662,10 @@ impl Validator { let (verified_vote_sender, verified_vote_receiver) = unbounded(); let (gossip_verified_vote_hash_sender, gossip_verified_vote_hash_receiver) = unbounded(); let (cluster_confirmed_slot_sender, cluster_confirmed_slot_receiver) = unbounded(); + + let rpc_completed_slots_service = + RpcCompletedSlotsService::spawn(completed_slots_receiver, rpc_subscriptions.clone()); + let tvu = Tvu::new( vote_account, authorized_voter_keypairs, @@ -692,12 +698,11 @@ impl Validator { }, blockstore.clone(), ledger_signal_receiver, - &subscriptions, + &rpc_subscriptions, &poh_recorder, tower, &leader_schedule_cache, &exit, - completed_slots_receiver, block_commitment_cache, config.enable_partition.clone(), transaction_status_sender.clone(), @@ -740,7 +745,7 @@ impl Validator { node.sockets.tpu, node.sockets.tpu_forwards, node.sockets.broadcast, - &subscriptions, + &rpc_subscriptions, transaction_status_sender, &blockstore, &config.broadcast_stage_type, @@ -765,6 +770,7 @@ impl Validator { serve_repair_service, json_rpc_service, pubsub_service, + rpc_completed_slots_service, optimistically_confirmed_bank_tracker, transaction_status_service, rewards_recorder_service, @@ -828,6 +834,10 @@ impl Validator { pubsub_service.join().expect("pubsub_service"); } + self.rpc_completed_slots_service + .join() + .expect("rpc_completed_slots_service"); + if let Some(optimistically_confirmed_bank_tracker) = self.optimistically_confirmed_bank_tracker { diff --git a/rpc/src/rpc_completed_slots_service.rs b/rpc/src/rpc_completed_slots_service.rs index 83ccd28d2e..0f9d393e61 100644 --- a/rpc/src/rpc_completed_slots_service.rs +++ b/rpc/src/rpc_completed_slots_service.rs @@ -13,23 +13,20 @@ pub struct RpcCompletedSlotsService; impl RpcCompletedSlotsService { pub fn spawn( completed_slots_receiver: CompletedSlotsReceiver, - rpc_subscriptions: Option>, - ) -> Option> { - let rpc_subscriptions = rpc_subscriptions?; - Some( - Builder::new() - .name("solana-rpc-completed-slots-service".to_string()) - .spawn(move || { - for slots in completed_slots_receiver.iter() { - for slot in slots { - rpc_subscriptions.notify_slot_update(SlotUpdate::Completed { - slot, - timestamp: timestamp(), - }); - } + rpc_subscriptions: Arc, + ) -> JoinHandle<()> { + Builder::new() + .name("solana-rpc-completed-slots-service".to_string()) + .spawn(move || { + for slots in completed_slots_receiver.iter() { + for slot in slots { + rpc_subscriptions.notify_slot_update(SlotUpdate::Completed { + slot, + timestamp: timestamp(), + }); } - }) - .unwrap(), - ) + } + }) + .unwrap() } }