From 61f0a7d9c36ddc2e8612b4fc7b317e79ebc3141f Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Tue, 5 Jul 2022 14:29:44 +0000 Subject: [PATCH] replaces Mutex with RwLock (#26370) Mutex causes superfluous lock contention when a read-only reference suffices. --- banking-bench/src/main.rs | 16 ++-- core/benches/banking_stage.rs | 4 +- core/src/banking_stage.rs | 116 ++++++++++++------------- core/src/cluster_info_vote_listener.rs | 10 +-- core/src/fetch_stage.rs | 12 +-- core/src/replay_stage.rs | 32 +++---- core/src/tpu.rs | 4 +- core/src/tvu.rs | 4 +- core/src/validator.rs | 6 +- core/src/voting_service.rs | 6 +- core/src/warm_quic_cache_service.rs | 6 +- poh/src/poh_recorder.rs | 6 +- poh/src/poh_service.rs | 32 +++---- rpc/src/cluster_tpu_info.rs | 10 +-- rpc/src/rpc_service.rs | 4 +- 15 files changed, 134 insertions(+), 134 deletions(-) diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index ea0438c36a..7f318ef5a1 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -30,7 +30,7 @@ use { }, solana_streamer::socket::SocketAddrSpace, std::{ - sync::{atomic::Ordering, Arc, Mutex, RwLock}, + sync::{atomic::Ordering, Arc, RwLock}, thread::sleep, time::{Duration, Instant}, }, @@ -39,7 +39,7 @@ use { fn check_txs( receiver: &Arc>, ref_tx_count: usize, - poh_recorder: &Arc>, + poh_recorder: &Arc>, ) -> bool { let mut total = 0; let now = Instant::now(); @@ -55,7 +55,7 @@ fn check_txs( if now.elapsed().as_secs() > 60 { break; } - if poh_recorder.lock().unwrap().bank().is_none() { + if poh_recorder.read().unwrap().bank().is_none() { no_bank = true; break; } @@ -358,7 +358,7 @@ fn main() { DEFAULT_TPU_CONNECTION_POOL_SIZE, )), ); - poh_recorder.lock().unwrap().set_bank(&bank, false); + poh_recorder.write().unwrap().set_bank(&bank, false); // This is so that the signal_receiver does not go out of scope after the closure. // If it is dropped before poh_service, then poh_service will error when @@ -396,7 +396,7 @@ fn main() { if bank.get_signature_status(&tx.signatures[0]).is_some() { break; } - if poh_recorder.lock().unwrap().bank().is_none() { + if poh_recorder.read().unwrap().bank().is_none() { break; } sleep(Duration::from_millis(5)); @@ -418,7 +418,7 @@ fn main() { let mut poh_time = Measure::start("poh_time"); poh_recorder - .lock() + .write() .unwrap() .reset(bank.clone(), Some((bank.slot(), bank.slot() + 1))); poh_time.stop(); @@ -439,8 +439,8 @@ fn main() { std::u64::MAX, ); - poh_recorder.lock().unwrap().set_bank(&bank, false); - assert!(poh_recorder.lock().unwrap().bank().is_some()); + poh_recorder.write().unwrap().set_bank(&bank, false); + assert!(poh_recorder.read().unwrap().bank().is_some()); if bank.slot() > 32 { leader_schedule_cache.set_root(&bank); bank_forks.set_root(root, &AbsRequestSender::default(), None); diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index fe788dca2c..2562ad3789 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -74,7 +74,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) { let (exit, poh_recorder, poh_service, _signal_receiver) = create_test_recorder(&bank, &blockstore, None, None); - let recorder = poh_recorder.lock().unwrap().recorder(); + let recorder = poh_recorder.read().unwrap().recorder(); let tx = test_tx(); let transactions = vec![tx; 4194304]; @@ -233,7 +233,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { Arc::new(RwLock::new(CostModel::default())), Arc::new(ConnectionCache::default()), ); - poh_recorder.lock().unwrap().set_bank(&bank, false); + poh_recorder.write().unwrap().set_bank(&bank, false); let chunk_len = verified.len() / CHUNKS; let mut start = 0; diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 7cde63c052..8a3f4e3a9a 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -69,7 +69,7 @@ use { rc::Rc, sync::{ atomic::{AtomicU64, AtomicUsize, Ordering}, - Arc, Mutex, RwLock, + Arc, RwLock, }, thread::{self, Builder, JoinHandle}, time::{Duration, Instant}, @@ -411,7 +411,7 @@ impl BankingStage { #[allow(clippy::new_ret_no_self)] pub fn new( cluster_info: &Arc, - poh_recorder: &Arc>, + poh_recorder: &Arc>, verified_receiver: BankingPacketReceiver, tpu_verified_vote_receiver: BankingPacketReceiver, verified_vote_receiver: BankingPacketReceiver, @@ -437,7 +437,7 @@ impl BankingStage { #[allow(clippy::too_many_arguments)] pub fn new_num_threads( cluster_info: &Arc, - poh_recorder: &Arc>, + poh_recorder: &Arc>, verified_receiver: BankingPacketReceiver, tpu_verified_vote_receiver: BankingPacketReceiver, verified_vote_receiver: BankingPacketReceiver, @@ -539,7 +539,7 @@ impl BankingStage { connection_cache: &ConnectionCache, forward_option: &ForwardOption, cluster_info: &ClusterInfo, - poh_recorder: &Arc>, + poh_recorder: &Arc>, socket: &UdpSocket, filter_forwarding_results: &FilterForwardingResults, data_budget: &DataBudget, @@ -640,7 +640,7 @@ impl BankingStage { pub fn consume_buffered_packets( my_pubkey: &Pubkey, max_tx_ingestion_ns: u128, - poh_recorder: &Arc>, + poh_recorder: &Arc>, buffered_packet_batches: &mut UnprocessedPacketBatches, transaction_status_sender: Option, gossip_vote_sender: &ReplayVoteSender, @@ -672,8 +672,8 @@ impl BankingStage { // TODO: Right now we iterate through buffer and try the highest weighted transaction once // but we should retry the highest weighted transactions more often. let (bank_start, poh_recorder_lock_time) = measure!( - poh_recorder.lock().unwrap().bank_start(), - "poh_recorder_lock", + poh_recorder.read().unwrap().bank_start(), + "poh_recorder.read", ); slot_metrics_tracker.increment_consume_buffered_packets_poh_recorder_lock_us( poh_recorder_lock_time.as_us(), @@ -718,7 +718,7 @@ impl BankingStage { { let poh_recorder_lock_time = { let (poh_recorder_locked, poh_recorder_lock_time) = - measure!(poh_recorder.lock().unwrap(), "poh_recorder_lock"); + measure!(poh_recorder.read().unwrap(), "poh_recorder.read"); reached_end_of_slot = Some(EndOfSlot { next_slot_leader: poh_recorder_locked.next_slot_leader(), @@ -783,7 +783,7 @@ impl BankingStage { // packet batches in buffer let poh_recorder_lock_time = { let (poh_recorder_locked, poh_recorder_lock_time) = - measure!(poh_recorder.lock().unwrap(), "poh_recorder_lock"); + measure!(poh_recorder.read().unwrap(), "poh_recorder.read"); reached_end_of_slot = Some(EndOfSlot { next_slot_leader: poh_recorder_locked.next_slot_leader(), @@ -907,7 +907,7 @@ impl BankingStage { fn process_buffered_packets( my_pubkey: &Pubkey, socket: &UdpSocket, - poh_recorder: &Arc>, + poh_recorder: &Arc>, cluster_info: &ClusterInfo, buffered_packet_batches: &mut UnprocessedPacketBatches, forward_option: &ForwardOption, @@ -930,7 +930,7 @@ impl BankingStage { would_be_leader, would_be_leader_shortly, ) = { - let poh = poh_recorder.lock().unwrap(); + let poh = poh_recorder.read().unwrap(); bank_start = poh.bank_start(); ( poh.leader_after_n_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET), @@ -1037,7 +1037,7 @@ impl BankingStage { forward_option: &ForwardOption, cluster_info: &ClusterInfo, buffered_packet_batches: &mut UnprocessedPacketBatches, - poh_recorder: &Arc>, + poh_recorder: &Arc>, socket: &UdpSocket, hold: bool, data_budget: &DataBudget, @@ -1100,7 +1100,7 @@ impl BankingStage { #[allow(clippy::too_many_arguments)] fn process_loop( verified_receiver: &BankingPacketReceiver, - poh_recorder: &Arc>, + poh_recorder: &Arc>, cluster_info: &ClusterInfo, recv_start: &mut Instant, forward_option: ForwardOption, @@ -1112,7 +1112,7 @@ impl BankingStage { cost_model: Arc>, connection_cache: Arc, ) { - let recorder = poh_recorder.lock().unwrap().recorder(); + let recorder = poh_recorder.read().unwrap().recorder(); let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut buffered_packet_batches = UnprocessedPacketBatches::with_capacity(batch_limit); let mut banking_stage_stats = BankingStageStats::new(id); @@ -2237,35 +2237,35 @@ impl BankingStage { pub(crate) fn next_leader_tpu( cluster_info: &ClusterInfo, - poh_recorder: &Mutex, + poh_recorder: &RwLock, ) -> Option<(Pubkey, std::net::SocketAddr)> { next_leader_x(cluster_info, poh_recorder, |leader| leader.tpu) } fn next_leader_tpu_forwards( cluster_info: &ClusterInfo, - poh_recorder: &Mutex, + poh_recorder: &RwLock, ) -> Option<(Pubkey, std::net::SocketAddr)> { next_leader_x(cluster_info, poh_recorder, |leader| leader.tpu_forwards) } pub(crate) fn next_leader_tpu_vote( cluster_info: &ClusterInfo, - poh_recorder: &Mutex, + poh_recorder: &RwLock, ) -> Option<(Pubkey, std::net::SocketAddr)> { next_leader_x(cluster_info, poh_recorder, |leader| leader.tpu_vote) } fn next_leader_x( cluster_info: &ClusterInfo, - poh_recorder: &Mutex, + poh_recorder: &RwLock, port_selector: F, ) -> Option<(Pubkey, std::net::SocketAddr)> where F: FnOnce(&ContactInfo) -> SocketAddr, { let leader_pubkey = poh_recorder - .lock() + .read() .unwrap() .leader_after_n_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET); if let Some(leader_pubkey) = leader_pubkey { @@ -2711,11 +2711,11 @@ mod tests { Arc::new(AtomicBool::default()), ); let recorder = poh_recorder.recorder(); - let poh_recorder = Arc::new(Mutex::new(poh_recorder)); + let poh_recorder = Arc::new(RwLock::new(poh_recorder)); let poh_simulator = simulate_poh(record_receiver, &poh_recorder); - poh_recorder.lock().unwrap().set_bank(&bank, false); + poh_recorder.write().unwrap().set_bank(&bank, false); let pubkey = solana_sdk::pubkey::new_rand(); let keypair2 = Keypair::new(); let pubkey2 = solana_sdk::pubkey::new_rand(); @@ -2740,7 +2740,7 @@ mod tests { assert!(entry_receiver.try_recv().is_err()); poh_recorder - .lock() + .read() .unwrap() .is_exited .store(true, Ordering::Relaxed); @@ -2933,11 +2933,11 @@ mod tests { Arc::new(AtomicBool::default()), ); let recorder = poh_recorder.recorder(); - let poh_recorder = Arc::new(Mutex::new(poh_recorder)); + let poh_recorder = Arc::new(RwLock::new(poh_recorder)); let poh_simulator = simulate_poh(record_receiver, &poh_recorder); - poh_recorder.lock().unwrap().set_bank(&bank, false); + poh_recorder.write().unwrap().set_bank(&bank, false); let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); let process_transactions_batch_output = BankingStage::process_and_record_transactions( @@ -2964,8 +2964,8 @@ mod tests { assert!(commit_transactions_result.is_ok()); // Tick up to max tick height - while poh_recorder.lock().unwrap().tick_height() != bank.max_tick_height() { - poh_recorder.lock().unwrap().tick(); + while poh_recorder.read().unwrap().tick_height() != bank.max_tick_height() { + poh_recorder.write().unwrap().tick(); } let mut done = false; @@ -3021,7 +3021,7 @@ mod tests { ); poh_recorder - .lock() + .read() .unwrap() .is_exited .store(true, Ordering::Relaxed); @@ -3068,11 +3068,11 @@ mod tests { Arc::new(AtomicBool::default()), ); let recorder = poh_recorder.recorder(); - let poh_recorder = Arc::new(Mutex::new(poh_recorder)); + let poh_recorder = Arc::new(RwLock::new(poh_recorder)); let poh_simulator = simulate_poh(record_receiver, &poh_recorder); - poh_recorder.lock().unwrap().set_bank(&bank, false); + poh_recorder.write().unwrap().set_bank(&bank, false); let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); let process_transactions_batch_output = BankingStage::process_and_record_transactions( @@ -3104,7 +3104,7 @@ mod tests { ); poh_recorder - .lock() + .read() .unwrap() .is_exited .store(true, Ordering::Relaxed); @@ -3141,11 +3141,11 @@ mod tests { Arc::new(AtomicBool::default()), ); let recorder = poh_recorder.recorder(); - let poh_recorder = Arc::new(Mutex::new(poh_recorder)); + let poh_recorder = Arc::new(RwLock::new(poh_recorder)); let poh_simulator = simulate_poh(record_receiver, &poh_recorder); - poh_recorder.lock().unwrap().set_bank(&bank, false); + poh_recorder.write().unwrap().set_bank(&bank, false); let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); let qos_service = QosService::new(Arc::new(RwLock::new(CostModel::default())), 1); @@ -3229,7 +3229,7 @@ mod tests { assert_eq!(get_tx_count(), 2); poh_recorder - .lock() + .read() .unwrap() .is_exited .store(true, Ordering::Relaxed); @@ -3240,10 +3240,10 @@ mod tests { fn simulate_poh( record_receiver: CrossbeamReceiver, - poh_recorder: &Arc>, + poh_recorder: &Arc>, ) -> JoinHandle<()> { let poh_recorder = poh_recorder.clone(); - let is_exited = poh_recorder.lock().unwrap().is_exited.clone(); + let is_exited = poh_recorder.read().unwrap().is_exited.clone(); let tick_producer = Builder::new() .name("solana-simulate_poh".to_string()) .spawn(move || loop { @@ -3293,9 +3293,9 @@ mod tests { Arc::new(AtomicBool::default()), ); let recorder = poh_recorder.recorder(); - let poh_recorder = Arc::new(Mutex::new(poh_recorder)); + let poh_recorder = Arc::new(RwLock::new(poh_recorder)); - poh_recorder.lock().unwrap().set_bank(&bank, false); + poh_recorder.write().unwrap().set_bank(&bank, false); let poh_simulator = simulate_poh(record_receiver, &poh_recorder); @@ -3312,7 +3312,7 @@ mod tests { ); poh_recorder - .lock() + .read() .unwrap() .is_exited .store(true, Ordering::Relaxed); @@ -3431,7 +3431,7 @@ mod tests { // record let recorder = poh_recorder.recorder(); - let poh_simulator = simulate_poh(record_receiver, &Arc::new(Mutex::new(poh_recorder))); + let poh_simulator = simulate_poh(record_receiver, &Arc::new(RwLock::new(poh_recorder))); let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); @@ -3493,9 +3493,9 @@ mod tests { Arc::new(AtomicBool::default()), ); let recorder = poh_recorder.recorder(); - let poh_recorder = Arc::new(Mutex::new(poh_recorder)); + let poh_recorder = Arc::new(RwLock::new(poh_recorder)); - poh_recorder.lock().unwrap().set_bank(&bank, false); + poh_recorder.write().unwrap().set_bank(&bank, false); let poh_simulator = simulate_poh(record_receiver, &poh_recorder); @@ -3512,7 +3512,7 @@ mod tests { ); poh_recorder - .lock() + .read() .unwrap() .is_exited .store(true, Ordering::Relaxed); @@ -3698,11 +3698,11 @@ mod tests { Arc::new(AtomicBool::default()), ); let recorder = poh_recorder.recorder(); - let poh_recorder = Arc::new(Mutex::new(poh_recorder)); + let poh_recorder = Arc::new(RwLock::new(poh_recorder)); let poh_simulator = simulate_poh(record_receiver, &poh_recorder); - poh_recorder.lock().unwrap().set_bank(&bank, false); + poh_recorder.write().unwrap().set_bank(&bank, false); let shreds = entries_to_test_shreds(&entries, bank.slot(), 0, true, 0); blockstore.insert_shreds(shreds, None, false).unwrap(); @@ -3756,7 +3756,7 @@ mod tests { assert_eq!(actual_tx_results, expected_tx_results); poh_recorder - .lock() + .read() .unwrap() .is_exited .store(true, Ordering::Relaxed); @@ -3859,11 +3859,11 @@ mod tests { Arc::new(AtomicBool::default()), ); let recorder = poh_recorder.recorder(); - let poh_recorder = Arc::new(Mutex::new(poh_recorder)); + let poh_recorder = Arc::new(RwLock::new(poh_recorder)); let poh_simulator = simulate_poh(record_receiver, &poh_recorder); - poh_recorder.lock().unwrap().set_bank(&bank, false); + poh_recorder.write().unwrap().set_bank(&bank, false); let shreds = entries_to_test_shreds(&entries, bank.slot(), 0, true, 0); blockstore.insert_shreds(shreds, None, false).unwrap(); @@ -3914,7 +3914,7 @@ mod tests { } ); poh_recorder - .lock() + .read() .unwrap() .is_exited .store(true, Ordering::Relaxed); @@ -3929,7 +3929,7 @@ mod tests { ) -> ( Vec, Arc, - Arc>, + Arc>, Receiver, JoinHandle<()>, ) { @@ -3956,7 +3956,7 @@ mod tests { &Arc::new(PohConfig::default()), exit, ); - let poh_recorder = Arc::new(Mutex::new(poh_recorder)); + let poh_recorder = Arc::new(RwLock::new(poh_recorder)); // Set up unparallelizable conflicting transactions let pubkey0 = solana_sdk::pubkey::new_rand(); @@ -3984,7 +3984,7 @@ mod tests { { let (transactions, bank, poh_recorder, _entry_receiver, poh_simulator) = setup_conflicting_transactions(ledger_path.path()); - let recorder = poh_recorder.lock().unwrap().recorder(); + let recorder = poh_recorder.read().unwrap().recorder(); let num_conflicting_transactions = transactions.len(); let deserialized_packets = unprocessed_packet_batches::transactions_to_deserialized_packets(&transactions) @@ -3999,7 +3999,7 @@ mod tests { let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); // When the working bank in poh_recorder is None, no packets should be processed - assert!(!poh_recorder.lock().unwrap().has_bank()); + assert!(!poh_recorder.read().unwrap().has_bank()); let max_tx_processing_ns = std::u128::MAX; BankingStage::consume_buffered_packets( &Pubkey::default(), @@ -4020,7 +4020,7 @@ mod tests { // Processes one packet per iteration of the loop let num_packets_to_process_per_iteration = num_conflicting_transactions; for num_expected_unprocessed in (0..num_conflicting_transactions).rev() { - poh_recorder.lock().unwrap().set_bank(&bank, false); + poh_recorder.write().unwrap().set_bank(&bank, false); BankingStage::consume_buffered_packets( &Pubkey::default(), max_tx_processing_ns, @@ -4042,7 +4042,7 @@ mod tests { } } poh_recorder - .lock() + .read() .unwrap() .is_exited .store(true, Ordering::Relaxed); @@ -4069,9 +4069,9 @@ mod tests { // each iteration of this loop will process one element of the batch per iteration of the // loop. let interrupted_iteration = 1; - poh_recorder.lock().unwrap().set_bank(&bank, false); + poh_recorder.write().unwrap().set_bank(&bank, false); let poh_recorder_ = poh_recorder.clone(); - let recorder = poh_recorder_.lock().unwrap().recorder(); + let recorder = poh_recorder_.read().unwrap().recorder(); let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); // Start up thread to process the banks let t_consume = Builder::new() @@ -4126,7 +4126,7 @@ mod tests { finished_packet_receiver.recv().unwrap(); if i == interrupted_iteration { poh_recorder - .lock() + .write() .unwrap() .schedule_dummy_max_height_reached_failure(); } @@ -4135,7 +4135,7 @@ mod tests { t_consume.join().unwrap(); poh_recorder - .lock() + .read() .unwrap() .is_exited .store(true, Ordering::Relaxed); diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index 02d858a1a1..98a31b7b08 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -48,7 +48,7 @@ use { iter::repeat, sync::{ atomic::{AtomicBool, Ordering}, - Arc, Mutex, RwLock, + Arc, RwLock, }, thread::{self, sleep, Builder, JoinHandle}, time::{Duration, Instant}, @@ -234,7 +234,7 @@ impl ClusterInfoVoteListener { exit: Arc, cluster_info: Arc, verified_packets_sender: BankingPacketSender, - poh_recorder: Arc>, + poh_recorder: Arc>, vote_tracker: Arc, bank_forks: Arc>, subscriptions: Arc, @@ -375,7 +375,7 @@ impl ClusterInfoVoteListener { fn bank_send_loop( exit: Arc, verified_vote_label_packets_receiver: VerifiedLabelVotePacketsReceiver, - poh_recorder: Arc>, + poh_recorder: Arc>, verified_packets_sender: &BankingPacketSender, ) -> Result<()> { let mut verified_vote_packets = VerifiedVotePackets::default(); @@ -388,7 +388,7 @@ impl ClusterInfoVoteListener { } let would_be_leader = poh_recorder - .lock() + .read() .unwrap() .would_be_leader(3 * slot_hashes::MAX_ENTRIES as u64 * DEFAULT_TICKS_PER_SLOT); @@ -409,7 +409,7 @@ impl ClusterInfoVoteListener { // Always set this to avoid taking the poh lock too often time_since_lock = Instant::now(); // We will take this lock at most once every `BANK_SEND_VOTES_LOOP_SLEEP_MS` - let current_working_bank = poh_recorder.lock().unwrap().bank(); + let current_working_bank = poh_recorder.read().unwrap().bank(); if let Some(current_working_bank) = current_working_bank { Self::check_for_leader_bank_and_send_votes( &mut bank_vote_sender_state_option, diff --git a/core/src/fetch_stage.rs b/core/src/fetch_stage.rs index d15e9c9bd0..c041739d7c 100644 --- a/core/src/fetch_stage.rs +++ b/core/src/fetch_stage.rs @@ -20,7 +20,7 @@ use { net::UdpSocket, sync::{ atomic::{AtomicBool, Ordering}, - Arc, Mutex, + Arc, RwLock, }, thread::{self, sleep, Builder, JoinHandle}, time::Duration, @@ -38,7 +38,7 @@ impl FetchStage { tpu_forwards_sockets: Vec, tpu_vote_sockets: Vec, exit: &Arc, - poh_recorder: &Arc>, + poh_recorder: &Arc>, coalesce_ms: u64, ) -> (Self, PacketBatchReceiver, PacketBatchReceiver) { let (sender, receiver) = unbounded(); @@ -73,7 +73,7 @@ impl FetchStage { vote_sender: &PacketBatchSender, forward_sender: &PacketBatchSender, forward_receiver: PacketBatchReceiver, - poh_recorder: &Arc>, + poh_recorder: &Arc>, coalesce_ms: u64, in_vote_only_mode: Option>, ) -> Self { @@ -98,7 +98,7 @@ impl FetchStage { fn handle_forwarded_packets( recvr: &PacketBatchReceiver, sendr: &PacketBatchSender, - poh_recorder: &Arc>, + poh_recorder: &Arc>, ) -> Result<()> { let mark_forwarded = |packet: &mut Packet| { packet.meta.flags |= PacketFlags::FORWARDED; @@ -119,7 +119,7 @@ impl FetchStage { } if poh_recorder - .lock() + .read() .unwrap() .would_be_leader(HOLD_TRANSACTIONS_SLOT_OFFSET.saturating_mul(DEFAULT_TICKS_PER_SLOT)) { @@ -147,7 +147,7 @@ impl FetchStage { vote_sender: &PacketBatchSender, forward_sender: &PacketBatchSender, forward_receiver: PacketBatchReceiver, - poh_recorder: &Arc>, + poh_recorder: &Arc>, coalesce_ms: u64, in_vote_only_mode: Option>, ) -> Self { diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index a70f763f6e..2cd43155ac 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -71,7 +71,7 @@ use { result, sync::{ atomic::{AtomicBool, Ordering}, - Arc, Mutex, RwLock, + Arc, RwLock, }, thread::{self, Builder, JoinHandle}, time::{Duration, Instant}, @@ -360,7 +360,7 @@ impl ReplayStage { cluster_info: Arc, ledger_signal_receiver: Receiver, duplicate_slots_receiver: DuplicateSlotReceiver, - poh_recorder: Arc>, + poh_recorder: Arc>, maybe_process_blockstore: Option, vote_tracker: Arc, cluster_slots: Arc, @@ -476,7 +476,7 @@ impl ReplayStage { ); generate_new_bank_forks_time.stop(); - let mut tpu_has_bank = poh_recorder.lock().unwrap().has_bank(); + let mut tpu_has_bank = poh_recorder.read().unwrap().has_bank(); let mut replay_active_banks_time = Measure::start("replay_active_banks_time"); let mut ancestors = bank_forks.read().unwrap().ancestors(); @@ -833,7 +833,7 @@ impl ReplayStage { let mut start_leader_time = Measure::start("start_leader_time"); let mut dump_then_repair_correct_slots_time = Measure::start("dump_then_repair_correct_slots_time"); // Used for correctness check - let poh_bank = poh_recorder.lock().unwrap().bank(); + let poh_bank = poh_recorder.read().unwrap().bank(); // Dump any duplicate slots that have been confirmed by the network in // anticipation of repairing the confirmed version of the slot. // @@ -868,7 +868,7 @@ impl ReplayStage { transaction_status_sender.is_some(), ); - let poh_bank = poh_recorder.lock().unwrap().bank(); + let poh_bank = poh_recorder.read().unwrap().bank(); if let Some(bank) = poh_bank { Self::log_leader_change( &my_pubkey, @@ -1000,11 +1000,11 @@ impl ReplayStage { } fn retransmit_latest_unpropagated_leader_slot( - poh_recorder: &Arc>, + poh_recorder: &Arc>, retransmit_slots_sender: &RetransmitSlotsSender, progress: &mut ProgressMap, ) { - let start_slot = poh_recorder.lock().unwrap().start_slot(); + let start_slot = poh_recorder.read().unwrap().start_slot(); if let (false, Some(latest_leader_slot)) = progress.get_leader_propagation_slot_must_exist(start_slot) @@ -1545,7 +1545,7 @@ impl ReplayStage { fn maybe_start_leader( my_pubkey: &Pubkey, bank_forks: &Arc>, - poh_recorder: &Arc>, + poh_recorder: &Arc>, leader_schedule_cache: &Arc, rpc_subscriptions: &Arc, progress_map: &mut ProgressMap, @@ -1554,12 +1554,12 @@ impl ReplayStage { has_new_vote_been_rooted: bool, track_transaction_indexes: bool, ) { - // all the individual calls to poh_recorder.lock() are designed to + // all the individual calls to poh_recorder.read() are designed to // increase granularity, decrease contention - assert!(!poh_recorder.lock().unwrap().has_bank()); + assert!(!poh_recorder.read().unwrap().has_bank()); - let (poh_slot, parent_slot) = match poh_recorder.lock().unwrap().reached_leader_slot() { + let (poh_slot, parent_slot) = match poh_recorder.read().unwrap().reached_leader_slot() { PohLeaderStatus::Reached { poh_slot, parent_slot, @@ -1674,7 +1674,7 @@ impl ReplayStage { let tpu_bank = bank_forks.write().unwrap().insert(tpu_bank); poh_recorder - .lock() + .write() .unwrap() .set_bank(&tpu_bank, track_transaction_indexes); } else { @@ -2150,7 +2150,7 @@ impl ReplayStage { my_pubkey: &Pubkey, blockstore: &Blockstore, bank: &Arc, - poh_recorder: &Mutex, + poh_recorder: &RwLock, leader_schedule_cache: &LeaderScheduleCache, ) { let next_leader_slot = leader_schedule_cache.next_leader_slot( @@ -2161,7 +2161,7 @@ impl ReplayStage { GRACE_TICKS_FACTOR * MAX_GRACE_SLOTS, ); poh_recorder - .lock() + .write() .unwrap() .reset(bank.clone(), next_leader_slot); @@ -3271,7 +3271,7 @@ pub(crate) mod tests { my_pubkey: Pubkey, cluster_info: ClusterInfo, leader_schedule_cache: Arc, - poh_recorder: Mutex, + poh_recorder: RwLock, tower: Tower, rpc_subscriptions: Arc, pub vote_simulator: VoteSimulator, @@ -3322,7 +3322,7 @@ pub(crate) mod tests { // PohRecorder let working_bank = bank_forks.read().unwrap().working_bank(); - let poh_recorder = Mutex::new( + let poh_recorder = RwLock::new( PohRecorder::new( working_bank.tick_height(), working_bank.last_blockhash(), diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 720d7cf3c5..72acd127db 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -36,7 +36,7 @@ use { }, std::{ net::UdpSocket, - sync::{atomic::AtomicBool, Arc, Mutex, RwLock}, + sync::{atomic::AtomicBool, Arc, RwLock}, thread, }, }; @@ -73,7 +73,7 @@ impl Tpu { #[allow(clippy::too_many_arguments)] pub fn new( cluster_info: &Arc, - poh_recorder: &Arc>, + poh_recorder: &Arc>, entry_receiver: Receiver, retransmit_slots_receiver: RetransmitSlotsReceiver, sockets: TpuSockets, diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 7b4de2cdd3..6e14d19794 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -52,7 +52,7 @@ use { std::{ collections::HashSet, net::UdpSocket, - sync::{atomic::AtomicBool, Arc, Mutex, RwLock}, + sync::{atomic::AtomicBool, Arc, RwLock}, thread, }, }; @@ -105,7 +105,7 @@ impl Tvu { blockstore: Arc, ledger_signal_receiver: Receiver, rpc_subscriptions: &Arc, - poh_recorder: &Arc>, + poh_recorder: &Arc>, maybe_process_block_store: Option, tower_storage: Arc, leader_schedule_cache: &Arc, diff --git a/core/src/validator.rs b/core/src/validator.rs index 0a8245d326..6e6b4b0791 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -106,7 +106,7 @@ use { path::{Path, PathBuf}, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, - Arc, Mutex, RwLock, + Arc, RwLock, }, thread::{sleep, Builder, JoinHandle}, time::{Duration, Instant}, @@ -341,7 +341,7 @@ pub struct Validator { serve_repair_service: ServeRepairService, completed_data_sets_service: CompletedDataSetsService, snapshot_packager_service: Option, - poh_recorder: Arc>, + poh_recorder: Arc>, poh_service: PohService, tpu: Tpu, tvu: Tvu, @@ -755,7 +755,7 @@ impl Validator { exit.clone(), ) }; - let poh_recorder = Arc::new(Mutex::new(poh_recorder)); + let poh_recorder = Arc::new(RwLock::new(poh_recorder)); let use_quic = UseQUIC::new(use_quic).expect("Failed to initialize QUIC flags"); let connection_cache = Arc::new(ConnectionCache::new(use_quic, tpu_connection_pool_size)); diff --git a/core/src/voting_service.rs b/core/src/voting_service.rs index 4f7e585d61..29cf4699dd 100644 --- a/core/src/voting_service.rs +++ b/core/src/voting_service.rs @@ -7,7 +7,7 @@ use { solana_runtime::bank_forks::BankForks, solana_sdk::{clock::Slot, transaction::Transaction}, std::{ - sync::{Arc, Mutex, RwLock}, + sync::{Arc, RwLock}, thread::{self, Builder, JoinHandle}, }, }; @@ -41,7 +41,7 @@ impl VotingService { pub fn new( vote_receiver: Receiver, cluster_info: Arc, - poh_recorder: Arc>, + poh_recorder: Arc>, tower_storage: Arc, bank_forks: Arc>, ) -> Self { @@ -66,7 +66,7 @@ impl VotingService { pub fn handle_vote( cluster_info: &ClusterInfo, - poh_recorder: &Mutex, + poh_recorder: &RwLock, tower_storage: &dyn TowerStorage, vote_op: VoteOp, send_to_tpu_vote_port: bool, diff --git a/core/src/warm_quic_cache_service.rs b/core/src/warm_quic_cache_service.rs index 86fb9c80dd..2632d03101 100644 --- a/core/src/warm_quic_cache_service.rs +++ b/core/src/warm_quic_cache_service.rs @@ -9,7 +9,7 @@ use { std::{ sync::{ atomic::{AtomicBool, Ordering}, - Arc, Mutex, + Arc, RwLock, }, thread::{self, sleep, Builder, JoinHandle}, time::Duration, @@ -28,7 +28,7 @@ impl WarmQuicCacheService { pub fn new( connection_cache: Arc, cluster_info: Arc, - poh_recorder: Arc>, + poh_recorder: Arc>, exit: Arc, ) -> Self { let thread_hdl = Builder::new() @@ -38,7 +38,7 @@ impl WarmQuicCacheService { let mut maybe_last_leader = None; while !exit.load(Ordering::Relaxed) { let leader_pubkey = poh_recorder - .lock() + .read() .unwrap() .leader_after_n_slots((CACHE_OFFSET_SLOT + slot_jitter) as u64); if let Some(leader_pubkey) = leader_pubkey { diff --git a/poh/src/poh_recorder.rs b/poh/src/poh_recorder.rs index 000d58ab46..e685b950be 100644 --- a/poh/src/poh_recorder.rs +++ b/poh/src/poh_recorder.rs @@ -32,7 +32,7 @@ use { cmp, sync::{ atomic::{AtomicBool, Ordering}, - Arc, Mutex, + Arc, Mutex, RwLock, }, time::{Duration, Instant}, }, @@ -949,7 +949,7 @@ pub fn create_test_recorder( leader_schedule_cache: Option>, ) -> ( Arc, - Arc>, + Arc>, PohService, Receiver, ) { @@ -973,7 +973,7 @@ pub fn create_test_recorder( ); poh_recorder.set_bank(bank, false); - let poh_recorder = Arc::new(Mutex::new(poh_recorder)); + let poh_recorder = Arc::new(RwLock::new(poh_recorder)); let poh_service = PohService::new( poh_recorder.clone(), &poh_config, diff --git a/poh/src/poh_service.rs b/poh/src/poh_service.rs index fda686f115..70f4d2f1ed 100644 --- a/poh/src/poh_service.rs +++ b/poh/src/poh_service.rs @@ -10,7 +10,7 @@ use { std::{ sync::{ atomic::{AtomicBool, Ordering}, - Arc, Mutex, + Arc, Mutex, RwLock, }, thread::{self, Builder, JoinHandle}, time::{Duration, Instant}, @@ -95,7 +95,7 @@ impl PohTiming { impl PohService { pub fn new( - poh_recorder: Arc>, + poh_recorder: Arc>, poh_config: &Arc, poh_exit: &Arc, ticks_per_slot: u64, @@ -163,7 +163,7 @@ impl PohService { } fn sleepy_tick_producer( - poh_recorder: Arc>, + poh_recorder: Arc>, poh_config: &PohConfig, poh_exit: &AtomicBool, record_receiver: Receiver, @@ -180,13 +180,13 @@ impl PohService { ); if remaining_tick_time.is_zero() { last_tick = Instant::now(); - poh_recorder.lock().unwrap().tick(); + poh_recorder.write().unwrap().tick(); } } } pub fn read_record_receiver_and_process( - poh_recorder: &Arc>, + poh_recorder: &Arc>, record_receiver: &Receiver, timeout: Duration, ) { @@ -194,7 +194,7 @@ impl PohService { if let Ok(record) = record { if record .sender - .send(poh_recorder.lock().unwrap().record( + .send(poh_recorder.write().unwrap().record( record.slot, record.mixin, record.transactions, @@ -207,7 +207,7 @@ impl PohService { } fn short_lived_sleepy_tick_producer( - poh_recorder: Arc>, + poh_recorder: Arc>, poh_config: &PohConfig, poh_exit: &AtomicBool, record_receiver: Receiver, @@ -227,7 +227,7 @@ impl PohService { ); if remaining_tick_time.is_zero() { last_tick = Instant::now(); - poh_recorder.lock().unwrap().tick(); + poh_recorder.write().unwrap().tick(); elapsed_ticks += 1; } if poh_exit.load(Ordering::Relaxed) && !warned { @@ -240,7 +240,7 @@ impl PohService { // returns true if we need to tick fn record_or_hash( next_record: &mut Option, - poh_recorder: &Arc>, + poh_recorder: &Arc>, timing: &mut PohTiming, record_receiver: &Receiver, hashes_per_batch: u64, @@ -252,7 +252,7 @@ impl PohService { // received message to record // so, record for as long as we have queued up record requests let mut lock_time = Measure::start("lock"); - let mut poh_recorder_l = poh_recorder.lock().unwrap(); + let mut poh_recorder_l = poh_recorder.write().unwrap(); lock_time.stop(); timing.total_lock_time_ns += lock_time.as_ns(); let mut record_time = Measure::start("record"); @@ -332,14 +332,14 @@ impl PohService { } fn tick_producer( - poh_recorder: Arc>, + poh_recorder: Arc>, poh_exit: &AtomicBool, ticks_per_slot: u64, hashes_per_batch: u64, record_receiver: Receiver, target_ns_per_tick: u64, ) { - let poh = poh_recorder.lock().unwrap().poh.clone(); + let poh = poh_recorder.read().unwrap().poh.clone(); let mut timing = PohTiming::new(); let mut next_record = None; loop { @@ -356,7 +356,7 @@ impl PohService { // Lock PohRecorder only for the final hash. record_or_hash will lock PohRecorder for record calls but not for hashing. { let mut lock_time = Measure::start("lock"); - let mut poh_recorder_l = poh_recorder.lock().unwrap(); + let mut poh_recorder_l = poh_recorder.write().unwrap(); lock_time.stop(); timing.total_lock_time_ns += lock_time.as_ns(); let mut tick_time = Measure::start("tick"); @@ -436,7 +436,7 @@ mod tests { &poh_config, exit.clone(), ); - let poh_recorder = Arc::new(Mutex::new(poh_recorder)); + let poh_recorder = Arc::new(RwLock::new(poh_recorder)); let ticks_per_slot = bank.ticks_per_slot(); let bank_slot = bank.slot(); @@ -462,7 +462,7 @@ mod tests { loop { // send some data let mut time = Measure::start("record"); - let _ = poh_recorder.lock().unwrap().record( + let _ = poh_recorder.write().unwrap().record( bank_slot, h1, vec![tx.clone()], @@ -500,7 +500,7 @@ mod tests { hashes_per_batch, record_receiver, ); - poh_recorder.lock().unwrap().set_bank(&bank, false); + poh_recorder.write().unwrap().set_bank(&bank, false); // get some events let mut hashes = 0; diff --git a/rpc/src/cluster_tpu_info.rs b/rpc/src/cluster_tpu_info.rs index bd7ad572d9..7e1982cf50 100644 --- a/rpc/src/cluster_tpu_info.rs +++ b/rpc/src/cluster_tpu_info.rs @@ -6,19 +6,19 @@ use { std::{ collections::HashMap, net::SocketAddr, - sync::{Arc, Mutex}, + sync::{Arc, RwLock}, }, }; #[derive(Clone)] pub struct ClusterTpuInfo { cluster_info: Arc, - poh_recorder: Arc>, + poh_recorder: Arc>, recent_peers: HashMap, } impl ClusterTpuInfo { - pub fn new(cluster_info: Arc, poh_recorder: Arc>) -> Self { + pub fn new(cluster_info: Arc, poh_recorder: Arc>) -> Self { Self { cluster_info, poh_recorder, @@ -38,7 +38,7 @@ impl TpuInfo for ClusterTpuInfo { } fn get_leader_tpus(&self, max_count: u64) -> Vec<&SocketAddr> { - let recorder = self.poh_recorder.lock().unwrap(); + let recorder = self.poh_recorder.read().unwrap(); let leaders: Vec<_> = (0..max_count) .filter_map(|i| recorder.leader_after_n_slots(i * NUM_CONSECUTIVE_LEADER_SLOTS)) .collect(); @@ -141,7 +141,7 @@ mod test { .collect(); let leader_info = ClusterTpuInfo { cluster_info, - poh_recorder: Arc::new(Mutex::new(poh_recorder)), + poh_recorder: Arc::new(RwLock::new(poh_recorder)), recent_peers: recent_peers.clone(), }; diff --git a/rpc/src/rpc_service.rs b/rpc/src/rpc_service.rs index ba17587012..02c2198b97 100644 --- a/rpc/src/rpc_service.rs +++ b/rpc/src/rpc_service.rs @@ -45,7 +45,7 @@ use { path::{Path, PathBuf}, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, - Arc, Mutex, RwLock, + Arc, RwLock, }, thread::{self, Builder, JoinHandle}, }, @@ -342,7 +342,7 @@ impl JsonRpcService { block_commitment_cache: Arc>, blockstore: Arc, cluster_info: Arc, - poh_recorder: Option>>, + poh_recorder: Option>>, genesis_hash: Hash, ledger_path: &Path, validator_exit: Arc>,