Extricate RpcCompletedSlotsService from RetransmitStage

This commit is contained in:
Michael Vines 2021-06-16 10:57:52 -07:00
parent f859a39b86
commit fa04531c7a
5 changed files with 68 additions and 78 deletions

View File

@ -118,7 +118,7 @@ pub struct ReplayStageConfig {
pub vote_account: Pubkey,
pub authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
pub exit: Arc<AtomicBool>,
pub subscriptions: Arc<RpcSubscriptions>,
pub rpc_subscriptions: Arc<RpcSubscriptions>,
pub leader_schedule_cache: Arc<LeaderScheduleCache>,
pub latest_root_senders: Vec<Sender<Slot>>,
pub accounts_background_request_sender: AbsRequestSender,
@ -302,7 +302,7 @@ impl ReplayStage {
vote_account,
authorized_voter_keypairs,
exit,
subscriptions,
rpc_subscriptions,
leader_schedule_cache,
latest_root_senders,
accounts_background_request_sender,
@ -319,7 +319,7 @@ impl ReplayStage {
let (lockouts_sender, commitment_service) = AggregateCommitmentService::new(
&exit,
block_commitment_cache.clone(),
subscriptions.clone(),
rpc_subscriptions.clone(),
);
#[allow(clippy::cognitive_complexity)]
@ -363,7 +363,7 @@ impl ReplayStage {
&blockstore,
&bank_forks,
&leader_schedule_cache,
&subscriptions,
&rpc_subscriptions,
&mut progress,
);
generate_new_bank_forks_time.stop();
@ -386,7 +386,7 @@ impl ReplayStage {
&replay_vote_sender,
&bank_notification_sender,
&rewards_recorder_sender,
&subscriptions,
&rpc_subscriptions,
&mut duplicate_slots_tracker,
&gossip_duplicate_confirmed_slots,
&mut unfrozen_gossip_verified_vote_hashes,
@ -577,7 +577,7 @@ impl ReplayStage {
&lockouts_sender,
&accounts_background_request_sender,
&latest_root_senders,
&subscriptions,
&rpc_subscriptions,
&block_commitment_cache,
&mut heaviest_subtree_fork_choice,
&bank_notification_sender,
@ -667,7 +667,7 @@ impl ReplayStage {
&bank_forks,
&poh_recorder,
&leader_schedule_cache,
&subscriptions,
&rpc_subscriptions,
&progress,
&retransmit_slots_sender,
&mut skipped_slots_info,
@ -1069,7 +1069,7 @@ impl ReplayStage {
bank_forks: &Arc<RwLock<BankForks>>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
subscriptions: &Arc<RpcSubscriptions>,
rpc_subscriptions: &Arc<RpcSubscriptions>,
progress_map: &ProgressMap,
retransmit_slots_sender: &RetransmitSlotsSender,
skipped_slots_info: &mut SkippedSlotsInfo,
@ -1181,7 +1181,7 @@ impl ReplayStage {
poh_slot,
root_slot,
my_pubkey,
subscriptions,
rpc_subscriptions,
);
let tpu_bank = bank_forks.write().unwrap().insert(tpu_bank);
@ -1230,7 +1230,7 @@ impl ReplayStage {
bank: &Bank,
root: Slot,
err: &BlockstoreProcessorError,
subscriptions: &Arc<RpcSubscriptions>,
rpc_subscriptions: &Arc<RpcSubscriptions>,
duplicate_slots_tracker: &mut DuplicateSlotsTracker,
gossip_duplicate_confirmed_slots: &GossipDuplicateConfirmedSlots,
progress: &mut ProgressMap,
@ -1264,7 +1264,7 @@ impl ReplayStage {
blockstore
.set_dead_slot(slot)
.expect("Failed to mark slot as dead in blockstore");
subscriptions.notify_slot_update(SlotUpdate::Dead {
rpc_subscriptions.notify_slot_update(SlotUpdate::Dead {
slot,
err: format!("error: {:?}", err),
timestamp: timestamp(),
@ -1297,7 +1297,7 @@ impl ReplayStage {
lockouts_sender: &Sender<CommitmentAggregationData>,
accounts_background_request_sender: &AbsRequestSender,
latest_root_senders: &[Sender<Slot>],
subscriptions: &Arc<RpcSubscriptions>,
rpc_subscriptions: &Arc<RpcSubscriptions>,
block_commitment_cache: &Arc<RwLock<BlockCommitmentCache>>,
heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice,
bank_notification_sender: &Option<BankNotificationSender>,
@ -1356,7 +1356,7 @@ impl ReplayStage {
has_new_vote_been_rooted,
vote_signatures,
);
subscriptions.notify_roots(rooted_slots);
rpc_subscriptions.notify_roots(rooted_slots);
if let Some(sender) = bank_notification_sender {
sender
.send(BankNotification::Root(root_bank))
@ -1643,7 +1643,7 @@ impl ReplayStage {
replay_vote_sender: &ReplayVoteSender,
bank_notification_sender: &Option<BankNotificationSender>,
rewards_recorder_sender: &Option<RewardsRecorderSender>,
subscriptions: &Arc<RpcSubscriptions>,
rpc_subscriptions: &Arc<RpcSubscriptions>,
duplicate_slots_tracker: &mut DuplicateSlotsTracker,
gossip_duplicate_confirmed_slots: &GossipDuplicateConfirmedSlots,
unfrozen_gossip_verified_vote_hashes: &mut UnfrozenGossipVerifiedVoteHashes,
@ -1714,7 +1714,7 @@ impl ReplayStage {
&bank,
root_slot,
&err,
subscriptions,
rpc_subscriptions,
duplicate_slots_tracker,
gossip_duplicate_confirmed_slots,
progress,
@ -2408,7 +2408,7 @@ impl ReplayStage {
blockstore: &Blockstore,
bank_forks: &RwLock<BankForks>,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
subscriptions: &Arc<RpcSubscriptions>,
rpc_subscriptions: &Arc<RpcSubscriptions>,
progress: &mut ProgressMap,
) {
// Find the next slot that chains to the old slot
@ -2453,7 +2453,7 @@ impl ReplayStage {
child_slot,
forks.root(),
&leader,
subscriptions,
rpc_subscriptions,
);
let empty: Vec<Pubkey> = vec![];
Self::update_fork_propagated_threshold_from_votes(
@ -2479,9 +2479,9 @@ impl ReplayStage {
slot: u64,
root_slot: u64,
leader: &Pubkey,
subscriptions: &Arc<RpcSubscriptions>,
rpc_subscriptions: &Arc<RpcSubscriptions>,
) -> Bank {
subscriptions.notify_slot(slot, parent.slot(), root_slot);
rpc_subscriptions.notify_slot(slot, parent.slot(), root_slot);
Bank::new_from_parent(parent, leader, slot)
}
@ -3191,7 +3191,7 @@ mod tests {
&&VerifyRecyclers::default(),
);
let subscriptions = Arc::new(RpcSubscriptions::new(
let rpc_subscriptions = Arc::new(RpcSubscriptions::new(
&exit,
bank_forks.clone(),
block_commitment_cache,
@ -3203,7 +3203,7 @@ mod tests {
&bank0,
0,
err,
&subscriptions,
&rpc_subscriptions,
&mut DuplicateSlotsTracker::default(),
&GossipDuplicateConfirmedSlots::default(),
&mut progress,
@ -3254,14 +3254,17 @@ mod tests {
let exit = Arc::new(AtomicBool::new(false));
let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
let subscriptions = Arc::new(RpcSubscriptions::new(
let rpc_subscriptions = Arc::new(RpcSubscriptions::new(
&exit,
bank_forks.clone(),
block_commitment_cache.clone(),
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
));
let (lockouts_sender, _) =
AggregateCommitmentService::new(&exit, block_commitment_cache.clone(), subscriptions);
let (lockouts_sender, _) = AggregateCommitmentService::new(
&exit,
block_commitment_cache.clone(),
rpc_subscriptions,
);
assert!(block_commitment_cache
.read()

View File

@ -18,17 +18,11 @@ use solana_gossip::{
contact_info::ContactInfo,
};
use solana_ledger::shred::{get_shred_slot_index_type, ShredFetchStats};
use solana_ledger::{
blockstore::{Blockstore, CompletedSlotsReceiver},
leader_schedule_cache::LeaderScheduleCache,
};
use solana_ledger::{blockstore::Blockstore, leader_schedule_cache::LeaderScheduleCache};
use solana_measure::measure::Measure;
use solana_metrics::inc_new_counter_error;
use solana_perf::packet::{Packet, Packets};
use solana_rpc::{
max_slots::MaxSlots, rpc_completed_slots_service::RpcCompletedSlotsService,
rpc_subscriptions::RpcSubscriptions,
};
use solana_rpc::{max_slots::MaxSlots, rpc_subscriptions::RpcSubscriptions};
use solana_runtime::{bank::Bank, bank_forks::BankForks};
use solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey, timing::timestamp};
use solana_streamer::streamer::PacketReceiver;
@ -602,7 +596,6 @@ impl RetransmitStage {
repair_socket: Arc<UdpSocket>,
verified_receiver: Receiver<Vec<Packets>>,
exit: &Arc<AtomicBool>,
rpc_completed_slots_receiver: CompletedSlotsReceiver,
cluster_slots_update_receiver: ClusterSlotsUpdateReceiver,
epoch_schedule: EpochSchedule,
cfg: Option<Arc<AtomicBool>>,
@ -619,18 +612,16 @@ impl RetransmitStage {
let (retransmit_sender, retransmit_receiver) = channel();
let retransmit_receiver = Arc::new(Mutex::new(retransmit_receiver));
let t_retransmit = retransmitter(
let thread_hdls = retransmitter(
retransmit_sockets,
bank_forks.clone(),
leader_schedule_cache,
cluster_info.clone(),
retransmit_receiver,
max_slots,
rpc_subscriptions.clone(),
rpc_subscriptions,
);
let rpc_completed_slots_hdl =
RpcCompletedSlotsService::spawn(rpc_completed_slots_receiver, rpc_subscriptions);
let cluster_slots_service = ClusterSlotsService::new(
blockstore.clone(),
cluster_slots.clone(),
@ -677,11 +668,6 @@ impl RetransmitStage {
duplicate_slots_sender,
);
let mut thread_hdls = t_retransmit;
if let Some(thread_hdl) = rpc_completed_slots_hdl {
thread_hdls.push(thread_hdl);
}
Self {
thread_hdls,
window_service,
@ -751,7 +737,7 @@ mod tests {
let cluster_info = Arc::new(cluster_info);
let (retransmit_sender, retransmit_receiver) = channel();
let t_retransmit = retransmitter(
let _t_retransmit = retransmitter(
retransmit_socket,
bank_forks,
&leader_schedule_cache,
@ -760,7 +746,6 @@ mod tests {
&Arc::new(MaxSlots::default()),
None,
);
let _thread_hdls = vec![t_retransmit];
let mut shred = Shred::new_from_data(0, 0, 0, None, true, true, 0, 0x20, 0);
let mut packet = Packet::default();

View File

@ -25,8 +25,7 @@ use crate::{
use crossbeam_channel::unbounded;
use solana_gossip::cluster_info::ClusterInfo;
use solana_ledger::{
blockstore::{Blockstore, CompletedSlotsReceiver},
blockstore_processor::TransactionStatusSender,
blockstore::Blockstore, blockstore_processor::TransactionStatusSender,
leader_schedule_cache::LeaderScheduleCache,
};
use solana_poh::poh_recorder::PohRecorder;
@ -109,12 +108,11 @@ impl Tvu {
sockets: Sockets,
blockstore: Arc<Blockstore>,
ledger_signal_receiver: Receiver<bool>,
subscriptions: &Arc<RpcSubscriptions>,
rpc_subscriptions: &Arc<RpcSubscriptions>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
tower: Tower,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
exit: &Arc<AtomicBool>,
completed_slots_receiver: CompletedSlotsReceiver,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
cfg: Option<Arc<AtomicBool>>,
transaction_status_sender: Option<TransactionStatusSender>,
@ -179,7 +177,6 @@ impl Tvu {
repair_socket,
verified_receiver,
&exit,
completed_slots_receiver,
cluster_slots_update_receiver,
*bank_forks.read().unwrap().working_bank().epoch_schedule(),
cfg,
@ -190,7 +187,7 @@ impl Tvu {
tvu_config.repair_validators,
completed_data_sets_sender,
max_slots,
Some(subscriptions.clone()),
Some(rpc_subscriptions.clone()),
duplicate_slots_sender,
);
@ -266,7 +263,7 @@ impl Tvu {
vote_account: *vote_account,
authorized_voter_keypairs,
exit: exit.clone(),
subscriptions: subscriptions.clone(),
rpc_subscriptions: rpc_subscriptions.clone(),
leader_schedule_cache: leader_schedule_cache.clone(),
latest_root_senders: vec![ledger_cleanup_slot_sender],
accounts_background_request_sender,
@ -381,7 +378,6 @@ pub mod tests {
let BlockstoreSignals {
blockstore,
ledger_signal_receiver,
completed_slots_receiver,
..
} = Blockstore::open_with_signal(&blockstore_path, None, true)
.expect("Expected to successfully open ledger");
@ -425,7 +421,6 @@ pub mod tests {
tower,
&leader_schedule_cache,
&exit,
completed_slots_receiver,
block_commitment_cache,
None,
None,

View File

@ -47,6 +47,7 @@ use solana_rpc::{
OptimisticallyConfirmedBank, OptimisticallyConfirmedBankTracker,
},
rpc::JsonRpcConfig,
rpc_completed_slots_service::RpcCompletedSlotsService,
rpc_pubsub_service::{PubSubConfig, PubSubService},
rpc_service::JsonRpcService,
rpc_subscriptions::RpcSubscriptions,
@ -80,7 +81,7 @@ use std::{
sync::atomic::{AtomicBool, AtomicU64, Ordering},
sync::mpsc::Receiver,
sync::{Arc, Mutex, RwLock},
thread::{sleep, Builder},
thread::{sleep, Builder, JoinHandle},
time::{Duration, Instant},
};
@ -243,6 +244,7 @@ pub struct Validator {
validator_exit: Arc<RwLock<Exit>>,
json_rpc_service: Option<JsonRpcService>,
pubsub_service: Option<PubSubService>,
rpc_completed_slots_service: JoinHandle<()>,
optimistically_confirmed_bank_tracker: Option<OptimisticallyConfirmedBankTracker>,
transaction_status_service: Option<TransactionStatusService>,
rewards_recorder_service: Option<RewardsRecorderService>,
@ -446,7 +448,7 @@ impl Validator {
let optimistically_confirmed_bank =
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
let subscriptions = Arc::new(RpcSubscriptions::new_with_vote_subscription(
let rpc_subscriptions = Arc::new(RpcSubscriptions::new_with_vote_subscription(
&exit,
bank_forks.clone(),
block_commitment_cache.clone(),
@ -460,7 +462,7 @@ impl Validator {
let completed_data_sets_service = CompletedDataSetsService::new(
completed_data_sets_receiver,
blockstore.clone(),
subscriptions.clone(),
rpc_subscriptions.clone(),
&exit,
max_slots.clone(),
);
@ -540,7 +542,7 @@ impl Validator {
} else {
Some(PubSubService::new(
config.pubsub_config.clone(),
&subscriptions,
&rpc_subscriptions,
rpc_pubsub_addr,
&exit,
))
@ -550,7 +552,7 @@ impl Validator {
&exit,
bank_forks.clone(),
optimistically_confirmed_bank,
subscriptions.clone(),
rpc_subscriptions.clone(),
)),
Some(bank_notification_sender),
)
@ -660,6 +662,10 @@ impl Validator {
let (verified_vote_sender, verified_vote_receiver) = unbounded();
let (gossip_verified_vote_hash_sender, gossip_verified_vote_hash_receiver) = unbounded();
let (cluster_confirmed_slot_sender, cluster_confirmed_slot_receiver) = unbounded();
let rpc_completed_slots_service =
RpcCompletedSlotsService::spawn(completed_slots_receiver, rpc_subscriptions.clone());
let tvu = Tvu::new(
vote_account,
authorized_voter_keypairs,
@ -692,12 +698,11 @@ impl Validator {
},
blockstore.clone(),
ledger_signal_receiver,
&subscriptions,
&rpc_subscriptions,
&poh_recorder,
tower,
&leader_schedule_cache,
&exit,
completed_slots_receiver,
block_commitment_cache,
config.enable_partition.clone(),
transaction_status_sender.clone(),
@ -740,7 +745,7 @@ impl Validator {
node.sockets.tpu,
node.sockets.tpu_forwards,
node.sockets.broadcast,
&subscriptions,
&rpc_subscriptions,
transaction_status_sender,
&blockstore,
&config.broadcast_stage_type,
@ -765,6 +770,7 @@ impl Validator {
serve_repair_service,
json_rpc_service,
pubsub_service,
rpc_completed_slots_service,
optimistically_confirmed_bank_tracker,
transaction_status_service,
rewards_recorder_service,
@ -828,6 +834,10 @@ impl Validator {
pubsub_service.join().expect("pubsub_service");
}
self.rpc_completed_slots_service
.join()
.expect("rpc_completed_slots_service");
if let Some(optimistically_confirmed_bank_tracker) =
self.optimistically_confirmed_bank_tracker
{

View File

@ -13,23 +13,20 @@ pub struct RpcCompletedSlotsService;
impl RpcCompletedSlotsService {
pub fn spawn(
completed_slots_receiver: CompletedSlotsReceiver,
rpc_subscriptions: Option<Arc<RpcSubscriptions>>,
) -> Option<JoinHandle<()>> {
let rpc_subscriptions = rpc_subscriptions?;
Some(
Builder::new()
.name("solana-rpc-completed-slots-service".to_string())
.spawn(move || {
for slots in completed_slots_receiver.iter() {
for slot in slots {
rpc_subscriptions.notify_slot_update(SlotUpdate::Completed {
slot,
timestamp: timestamp(),
});
}
rpc_subscriptions: Arc<RpcSubscriptions>,
) -> JoinHandle<()> {
Builder::new()
.name("solana-rpc-completed-slots-service".to_string())
.spawn(move || {
for slots in completed_slots_receiver.iter() {
for slot in slots {
rpc_subscriptions.notify_slot_update(SlotUpdate::Completed {
slot,
timestamp: timestamp(),
});
}
})
.unwrap(),
)
}
})
.unwrap()
}
}