replaces Mutex<PohRecorder> with RwLock<PohRecorder> (#26370)

Mutex causes superfluous lock contention when a read-only reference suffices.
This commit is contained in:
behzad nouri 2022-07-05 14:29:44 +00:00 committed by GitHub
parent 9ec38a3191
commit 61f0a7d9c3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 134 additions and 134 deletions

View File

@ -30,7 +30,7 @@ use {
},
solana_streamer::socket::SocketAddrSpace,
std::{
sync::{atomic::Ordering, Arc, Mutex, RwLock},
sync::{atomic::Ordering, Arc, RwLock},
thread::sleep,
time::{Duration, Instant},
},
@ -39,7 +39,7 @@ use {
fn check_txs(
receiver: &Arc<Receiver<WorkingBankEntry>>,
ref_tx_count: usize,
poh_recorder: &Arc<Mutex<PohRecorder>>,
poh_recorder: &Arc<RwLock<PohRecorder>>,
) -> bool {
let mut total = 0;
let now = Instant::now();
@ -55,7 +55,7 @@ fn check_txs(
if now.elapsed().as_secs() > 60 {
break;
}
if poh_recorder.lock().unwrap().bank().is_none() {
if poh_recorder.read().unwrap().bank().is_none() {
no_bank = true;
break;
}
@ -358,7 +358,7 @@ fn main() {
DEFAULT_TPU_CONNECTION_POOL_SIZE,
)),
);
poh_recorder.lock().unwrap().set_bank(&bank, false);
poh_recorder.write().unwrap().set_bank(&bank, false);
// This is so that the signal_receiver does not go out of scope after the closure.
// If it is dropped before poh_service, then poh_service will error when
@ -396,7 +396,7 @@ fn main() {
if bank.get_signature_status(&tx.signatures[0]).is_some() {
break;
}
if poh_recorder.lock().unwrap().bank().is_none() {
if poh_recorder.read().unwrap().bank().is_none() {
break;
}
sleep(Duration::from_millis(5));
@ -418,7 +418,7 @@ fn main() {
let mut poh_time = Measure::start("poh_time");
poh_recorder
.lock()
.write()
.unwrap()
.reset(bank.clone(), Some((bank.slot(), bank.slot() + 1)));
poh_time.stop();
@ -439,8 +439,8 @@ fn main() {
std::u64::MAX,
);
poh_recorder.lock().unwrap().set_bank(&bank, false);
assert!(poh_recorder.lock().unwrap().bank().is_some());
poh_recorder.write().unwrap().set_bank(&bank, false);
assert!(poh_recorder.read().unwrap().bank().is_some());
if bank.slot() > 32 {
leader_schedule_cache.set_root(&bank);
bank_forks.set_root(root, &AbsRequestSender::default(), None);

View File

@ -74,7 +74,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) {
let (exit, poh_recorder, poh_service, _signal_receiver) =
create_test_recorder(&bank, &blockstore, None, None);
let recorder = poh_recorder.lock().unwrap().recorder();
let recorder = poh_recorder.read().unwrap().recorder();
let tx = test_tx();
let transactions = vec![tx; 4194304];
@ -233,7 +233,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
Arc::new(RwLock::new(CostModel::default())),
Arc::new(ConnectionCache::default()),
);
poh_recorder.lock().unwrap().set_bank(&bank, false);
poh_recorder.write().unwrap().set_bank(&bank, false);
let chunk_len = verified.len() / CHUNKS;
let mut start = 0;

View File

@ -69,7 +69,7 @@ use {
rc::Rc,
sync::{
atomic::{AtomicU64, AtomicUsize, Ordering},
Arc, Mutex, RwLock,
Arc, RwLock,
},
thread::{self, Builder, JoinHandle},
time::{Duration, Instant},
@ -411,7 +411,7 @@ impl BankingStage {
#[allow(clippy::new_ret_no_self)]
pub fn new(
cluster_info: &Arc<ClusterInfo>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
poh_recorder: &Arc<RwLock<PohRecorder>>,
verified_receiver: BankingPacketReceiver,
tpu_verified_vote_receiver: BankingPacketReceiver,
verified_vote_receiver: BankingPacketReceiver,
@ -437,7 +437,7 @@ impl BankingStage {
#[allow(clippy::too_many_arguments)]
pub fn new_num_threads(
cluster_info: &Arc<ClusterInfo>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
poh_recorder: &Arc<RwLock<PohRecorder>>,
verified_receiver: BankingPacketReceiver,
tpu_verified_vote_receiver: BankingPacketReceiver,
verified_vote_receiver: BankingPacketReceiver,
@ -539,7 +539,7 @@ impl BankingStage {
connection_cache: &ConnectionCache,
forward_option: &ForwardOption,
cluster_info: &ClusterInfo,
poh_recorder: &Arc<Mutex<PohRecorder>>,
poh_recorder: &Arc<RwLock<PohRecorder>>,
socket: &UdpSocket,
filter_forwarding_results: &FilterForwardingResults,
data_budget: &DataBudget,
@ -640,7 +640,7 @@ impl BankingStage {
pub fn consume_buffered_packets(
my_pubkey: &Pubkey,
max_tx_ingestion_ns: u128,
poh_recorder: &Arc<Mutex<PohRecorder>>,
poh_recorder: &Arc<RwLock<PohRecorder>>,
buffered_packet_batches: &mut UnprocessedPacketBatches,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender,
@ -672,8 +672,8 @@ impl BankingStage {
// TODO: Right now we iterate through buffer and try the highest weighted transaction once
// but we should retry the highest weighted transactions more often.
let (bank_start, poh_recorder_lock_time) = measure!(
poh_recorder.lock().unwrap().bank_start(),
"poh_recorder_lock",
poh_recorder.read().unwrap().bank_start(),
"poh_recorder.read",
);
slot_metrics_tracker.increment_consume_buffered_packets_poh_recorder_lock_us(
poh_recorder_lock_time.as_us(),
@ -718,7 +718,7 @@ impl BankingStage {
{
let poh_recorder_lock_time = {
let (poh_recorder_locked, poh_recorder_lock_time) =
measure!(poh_recorder.lock().unwrap(), "poh_recorder_lock");
measure!(poh_recorder.read().unwrap(), "poh_recorder.read");
reached_end_of_slot = Some(EndOfSlot {
next_slot_leader: poh_recorder_locked.next_slot_leader(),
@ -783,7 +783,7 @@ impl BankingStage {
// packet batches in buffer
let poh_recorder_lock_time = {
let (poh_recorder_locked, poh_recorder_lock_time) =
measure!(poh_recorder.lock().unwrap(), "poh_recorder_lock");
measure!(poh_recorder.read().unwrap(), "poh_recorder.read");
reached_end_of_slot = Some(EndOfSlot {
next_slot_leader: poh_recorder_locked.next_slot_leader(),
@ -907,7 +907,7 @@ impl BankingStage {
fn process_buffered_packets(
my_pubkey: &Pubkey,
socket: &UdpSocket,
poh_recorder: &Arc<Mutex<PohRecorder>>,
poh_recorder: &Arc<RwLock<PohRecorder>>,
cluster_info: &ClusterInfo,
buffered_packet_batches: &mut UnprocessedPacketBatches,
forward_option: &ForwardOption,
@ -930,7 +930,7 @@ impl BankingStage {
would_be_leader,
would_be_leader_shortly,
) = {
let poh = poh_recorder.lock().unwrap();
let poh = poh_recorder.read().unwrap();
bank_start = poh.bank_start();
(
poh.leader_after_n_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET),
@ -1037,7 +1037,7 @@ impl BankingStage {
forward_option: &ForwardOption,
cluster_info: &ClusterInfo,
buffered_packet_batches: &mut UnprocessedPacketBatches,
poh_recorder: &Arc<Mutex<PohRecorder>>,
poh_recorder: &Arc<RwLock<PohRecorder>>,
socket: &UdpSocket,
hold: bool,
data_budget: &DataBudget,
@ -1100,7 +1100,7 @@ impl BankingStage {
#[allow(clippy::too_many_arguments)]
fn process_loop(
verified_receiver: &BankingPacketReceiver,
poh_recorder: &Arc<Mutex<PohRecorder>>,
poh_recorder: &Arc<RwLock<PohRecorder>>,
cluster_info: &ClusterInfo,
recv_start: &mut Instant,
forward_option: ForwardOption,
@ -1112,7 +1112,7 @@ impl BankingStage {
cost_model: Arc<RwLock<CostModel>>,
connection_cache: Arc<ConnectionCache>,
) {
let recorder = poh_recorder.lock().unwrap().recorder();
let recorder = poh_recorder.read().unwrap().recorder();
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut buffered_packet_batches = UnprocessedPacketBatches::with_capacity(batch_limit);
let mut banking_stage_stats = BankingStageStats::new(id);
@ -2237,35 +2237,35 @@ impl BankingStage {
pub(crate) fn next_leader_tpu(
cluster_info: &ClusterInfo,
poh_recorder: &Mutex<PohRecorder>,
poh_recorder: &RwLock<PohRecorder>,
) -> Option<(Pubkey, std::net::SocketAddr)> {
next_leader_x(cluster_info, poh_recorder, |leader| leader.tpu)
}
fn next_leader_tpu_forwards(
cluster_info: &ClusterInfo,
poh_recorder: &Mutex<PohRecorder>,
poh_recorder: &RwLock<PohRecorder>,
) -> Option<(Pubkey, std::net::SocketAddr)> {
next_leader_x(cluster_info, poh_recorder, |leader| leader.tpu_forwards)
}
pub(crate) fn next_leader_tpu_vote(
cluster_info: &ClusterInfo,
poh_recorder: &Mutex<PohRecorder>,
poh_recorder: &RwLock<PohRecorder>,
) -> Option<(Pubkey, std::net::SocketAddr)> {
next_leader_x(cluster_info, poh_recorder, |leader| leader.tpu_vote)
}
fn next_leader_x<F>(
cluster_info: &ClusterInfo,
poh_recorder: &Mutex<PohRecorder>,
poh_recorder: &RwLock<PohRecorder>,
port_selector: F,
) -> Option<(Pubkey, std::net::SocketAddr)>
where
F: FnOnce(&ContactInfo) -> SocketAddr,
{
let leader_pubkey = poh_recorder
.lock()
.read()
.unwrap()
.leader_after_n_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET);
if let Some(leader_pubkey) = leader_pubkey {
@ -2711,11 +2711,11 @@ mod tests {
Arc::new(AtomicBool::default()),
);
let recorder = poh_recorder.recorder();
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let poh_recorder = Arc::new(RwLock::new(poh_recorder));
let poh_simulator = simulate_poh(record_receiver, &poh_recorder);
poh_recorder.lock().unwrap().set_bank(&bank, false);
poh_recorder.write().unwrap().set_bank(&bank, false);
let pubkey = solana_sdk::pubkey::new_rand();
let keypair2 = Keypair::new();
let pubkey2 = solana_sdk::pubkey::new_rand();
@ -2740,7 +2740,7 @@ mod tests {
assert!(entry_receiver.try_recv().is_err());
poh_recorder
.lock()
.read()
.unwrap()
.is_exited
.store(true, Ordering::Relaxed);
@ -2933,11 +2933,11 @@ mod tests {
Arc::new(AtomicBool::default()),
);
let recorder = poh_recorder.recorder();
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let poh_recorder = Arc::new(RwLock::new(poh_recorder));
let poh_simulator = simulate_poh(record_receiver, &poh_recorder);
poh_recorder.lock().unwrap().set_bank(&bank, false);
poh_recorder.write().unwrap().set_bank(&bank, false);
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();
let process_transactions_batch_output = BankingStage::process_and_record_transactions(
@ -2964,8 +2964,8 @@ mod tests {
assert!(commit_transactions_result.is_ok());
// Tick up to max tick height
while poh_recorder.lock().unwrap().tick_height() != bank.max_tick_height() {
poh_recorder.lock().unwrap().tick();
while poh_recorder.read().unwrap().tick_height() != bank.max_tick_height() {
poh_recorder.write().unwrap().tick();
}
let mut done = false;
@ -3021,7 +3021,7 @@ mod tests {
);
poh_recorder
.lock()
.read()
.unwrap()
.is_exited
.store(true, Ordering::Relaxed);
@ -3068,11 +3068,11 @@ mod tests {
Arc::new(AtomicBool::default()),
);
let recorder = poh_recorder.recorder();
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let poh_recorder = Arc::new(RwLock::new(poh_recorder));
let poh_simulator = simulate_poh(record_receiver, &poh_recorder);
poh_recorder.lock().unwrap().set_bank(&bank, false);
poh_recorder.write().unwrap().set_bank(&bank, false);
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();
let process_transactions_batch_output = BankingStage::process_and_record_transactions(
@ -3104,7 +3104,7 @@ mod tests {
);
poh_recorder
.lock()
.read()
.unwrap()
.is_exited
.store(true, Ordering::Relaxed);
@ -3141,11 +3141,11 @@ mod tests {
Arc::new(AtomicBool::default()),
);
let recorder = poh_recorder.recorder();
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let poh_recorder = Arc::new(RwLock::new(poh_recorder));
let poh_simulator = simulate_poh(record_receiver, &poh_recorder);
poh_recorder.lock().unwrap().set_bank(&bank, false);
poh_recorder.write().unwrap().set_bank(&bank, false);
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();
let qos_service = QosService::new(Arc::new(RwLock::new(CostModel::default())), 1);
@ -3229,7 +3229,7 @@ mod tests {
assert_eq!(get_tx_count(), 2);
poh_recorder
.lock()
.read()
.unwrap()
.is_exited
.store(true, Ordering::Relaxed);
@ -3240,10 +3240,10 @@ mod tests {
fn simulate_poh(
record_receiver: CrossbeamReceiver<Record>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
poh_recorder: &Arc<RwLock<PohRecorder>>,
) -> JoinHandle<()> {
let poh_recorder = poh_recorder.clone();
let is_exited = poh_recorder.lock().unwrap().is_exited.clone();
let is_exited = poh_recorder.read().unwrap().is_exited.clone();
let tick_producer = Builder::new()
.name("solana-simulate_poh".to_string())
.spawn(move || loop {
@ -3293,9 +3293,9 @@ mod tests {
Arc::new(AtomicBool::default()),
);
let recorder = poh_recorder.recorder();
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let poh_recorder = Arc::new(RwLock::new(poh_recorder));
poh_recorder.lock().unwrap().set_bank(&bank, false);
poh_recorder.write().unwrap().set_bank(&bank, false);
let poh_simulator = simulate_poh(record_receiver, &poh_recorder);
@ -3312,7 +3312,7 @@ mod tests {
);
poh_recorder
.lock()
.read()
.unwrap()
.is_exited
.store(true, Ordering::Relaxed);
@ -3431,7 +3431,7 @@ mod tests {
// record
let recorder = poh_recorder.recorder();
let poh_simulator = simulate_poh(record_receiver, &Arc::new(Mutex::new(poh_recorder)));
let poh_simulator = simulate_poh(record_receiver, &Arc::new(RwLock::new(poh_recorder)));
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();
@ -3493,9 +3493,9 @@ mod tests {
Arc::new(AtomicBool::default()),
);
let recorder = poh_recorder.recorder();
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let poh_recorder = Arc::new(RwLock::new(poh_recorder));
poh_recorder.lock().unwrap().set_bank(&bank, false);
poh_recorder.write().unwrap().set_bank(&bank, false);
let poh_simulator = simulate_poh(record_receiver, &poh_recorder);
@ -3512,7 +3512,7 @@ mod tests {
);
poh_recorder
.lock()
.read()
.unwrap()
.is_exited
.store(true, Ordering::Relaxed);
@ -3698,11 +3698,11 @@ mod tests {
Arc::new(AtomicBool::default()),
);
let recorder = poh_recorder.recorder();
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let poh_recorder = Arc::new(RwLock::new(poh_recorder));
let poh_simulator = simulate_poh(record_receiver, &poh_recorder);
poh_recorder.lock().unwrap().set_bank(&bank, false);
poh_recorder.write().unwrap().set_bank(&bank, false);
let shreds = entries_to_test_shreds(&entries, bank.slot(), 0, true, 0);
blockstore.insert_shreds(shreds, None, false).unwrap();
@ -3756,7 +3756,7 @@ mod tests {
assert_eq!(actual_tx_results, expected_tx_results);
poh_recorder
.lock()
.read()
.unwrap()
.is_exited
.store(true, Ordering::Relaxed);
@ -3859,11 +3859,11 @@ mod tests {
Arc::new(AtomicBool::default()),
);
let recorder = poh_recorder.recorder();
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let poh_recorder = Arc::new(RwLock::new(poh_recorder));
let poh_simulator = simulate_poh(record_receiver, &poh_recorder);
poh_recorder.lock().unwrap().set_bank(&bank, false);
poh_recorder.write().unwrap().set_bank(&bank, false);
let shreds = entries_to_test_shreds(&entries, bank.slot(), 0, true, 0);
blockstore.insert_shreds(shreds, None, false).unwrap();
@ -3914,7 +3914,7 @@ mod tests {
}
);
poh_recorder
.lock()
.read()
.unwrap()
.is_exited
.store(true, Ordering::Relaxed);
@ -3929,7 +3929,7 @@ mod tests {
) -> (
Vec<Transaction>,
Arc<Bank>,
Arc<Mutex<PohRecorder>>,
Arc<RwLock<PohRecorder>>,
Receiver<WorkingBankEntry>,
JoinHandle<()>,
) {
@ -3956,7 +3956,7 @@ mod tests {
&Arc::new(PohConfig::default()),
exit,
);
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let poh_recorder = Arc::new(RwLock::new(poh_recorder));
// Set up unparallelizable conflicting transactions
let pubkey0 = solana_sdk::pubkey::new_rand();
@ -3984,7 +3984,7 @@ mod tests {
{
let (transactions, bank, poh_recorder, _entry_receiver, poh_simulator) =
setup_conflicting_transactions(ledger_path.path());
let recorder = poh_recorder.lock().unwrap().recorder();
let recorder = poh_recorder.read().unwrap().recorder();
let num_conflicting_transactions = transactions.len();
let deserialized_packets =
unprocessed_packet_batches::transactions_to_deserialized_packets(&transactions)
@ -3999,7 +3999,7 @@ mod tests {
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();
// When the working bank in poh_recorder is None, no packets should be processed
assert!(!poh_recorder.lock().unwrap().has_bank());
assert!(!poh_recorder.read().unwrap().has_bank());
let max_tx_processing_ns = std::u128::MAX;
BankingStage::consume_buffered_packets(
&Pubkey::default(),
@ -4020,7 +4020,7 @@ mod tests {
// Processes one packet per iteration of the loop
let num_packets_to_process_per_iteration = num_conflicting_transactions;
for num_expected_unprocessed in (0..num_conflicting_transactions).rev() {
poh_recorder.lock().unwrap().set_bank(&bank, false);
poh_recorder.write().unwrap().set_bank(&bank, false);
BankingStage::consume_buffered_packets(
&Pubkey::default(),
max_tx_processing_ns,
@ -4042,7 +4042,7 @@ mod tests {
}
}
poh_recorder
.lock()
.read()
.unwrap()
.is_exited
.store(true, Ordering::Relaxed);
@ -4069,9 +4069,9 @@ mod tests {
// each iteration of this loop will process one element of the batch per iteration of the
// loop.
let interrupted_iteration = 1;
poh_recorder.lock().unwrap().set_bank(&bank, false);
poh_recorder.write().unwrap().set_bank(&bank, false);
let poh_recorder_ = poh_recorder.clone();
let recorder = poh_recorder_.lock().unwrap().recorder();
let recorder = poh_recorder_.read().unwrap().recorder();
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();
// Start up thread to process the banks
let t_consume = Builder::new()
@ -4126,7 +4126,7 @@ mod tests {
finished_packet_receiver.recv().unwrap();
if i == interrupted_iteration {
poh_recorder
.lock()
.write()
.unwrap()
.schedule_dummy_max_height_reached_failure();
}
@ -4135,7 +4135,7 @@ mod tests {
t_consume.join().unwrap();
poh_recorder
.lock()
.read()
.unwrap()
.is_exited
.store(true, Ordering::Relaxed);

View File

@ -48,7 +48,7 @@ use {
iter::repeat,
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex, RwLock,
Arc, RwLock,
},
thread::{self, sleep, Builder, JoinHandle},
time::{Duration, Instant},
@ -234,7 +234,7 @@ impl ClusterInfoVoteListener {
exit: Arc<AtomicBool>,
cluster_info: Arc<ClusterInfo>,
verified_packets_sender: BankingPacketSender,
poh_recorder: Arc<Mutex<PohRecorder>>,
poh_recorder: Arc<RwLock<PohRecorder>>,
vote_tracker: Arc<VoteTracker>,
bank_forks: Arc<RwLock<BankForks>>,
subscriptions: Arc<RpcSubscriptions>,
@ -375,7 +375,7 @@ impl ClusterInfoVoteListener {
fn bank_send_loop(
exit: Arc<AtomicBool>,
verified_vote_label_packets_receiver: VerifiedLabelVotePacketsReceiver,
poh_recorder: Arc<Mutex<PohRecorder>>,
poh_recorder: Arc<RwLock<PohRecorder>>,
verified_packets_sender: &BankingPacketSender,
) -> Result<()> {
let mut verified_vote_packets = VerifiedVotePackets::default();
@ -388,7 +388,7 @@ impl ClusterInfoVoteListener {
}
let would_be_leader = poh_recorder
.lock()
.read()
.unwrap()
.would_be_leader(3 * slot_hashes::MAX_ENTRIES as u64 * DEFAULT_TICKS_PER_SLOT);
@ -409,7 +409,7 @@ impl ClusterInfoVoteListener {
// Always set this to avoid taking the poh lock too often
time_since_lock = Instant::now();
// We will take this lock at most once every `BANK_SEND_VOTES_LOOP_SLEEP_MS`
let current_working_bank = poh_recorder.lock().unwrap().bank();
let current_working_bank = poh_recorder.read().unwrap().bank();
if let Some(current_working_bank) = current_working_bank {
Self::check_for_leader_bank_and_send_votes(
&mut bank_vote_sender_state_option,

View File

@ -20,7 +20,7 @@ use {
net::UdpSocket,
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
Arc, RwLock,
},
thread::{self, sleep, Builder, JoinHandle},
time::Duration,
@ -38,7 +38,7 @@ impl FetchStage {
tpu_forwards_sockets: Vec<UdpSocket>,
tpu_vote_sockets: Vec<UdpSocket>,
exit: &Arc<AtomicBool>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
poh_recorder: &Arc<RwLock<PohRecorder>>,
coalesce_ms: u64,
) -> (Self, PacketBatchReceiver, PacketBatchReceiver) {
let (sender, receiver) = unbounded();
@ -73,7 +73,7 @@ impl FetchStage {
vote_sender: &PacketBatchSender,
forward_sender: &PacketBatchSender,
forward_receiver: PacketBatchReceiver,
poh_recorder: &Arc<Mutex<PohRecorder>>,
poh_recorder: &Arc<RwLock<PohRecorder>>,
coalesce_ms: u64,
in_vote_only_mode: Option<Arc<AtomicBool>>,
) -> Self {
@ -98,7 +98,7 @@ impl FetchStage {
fn handle_forwarded_packets(
recvr: &PacketBatchReceiver,
sendr: &PacketBatchSender,
poh_recorder: &Arc<Mutex<PohRecorder>>,
poh_recorder: &Arc<RwLock<PohRecorder>>,
) -> Result<()> {
let mark_forwarded = |packet: &mut Packet| {
packet.meta.flags |= PacketFlags::FORWARDED;
@ -119,7 +119,7 @@ impl FetchStage {
}
if poh_recorder
.lock()
.read()
.unwrap()
.would_be_leader(HOLD_TRANSACTIONS_SLOT_OFFSET.saturating_mul(DEFAULT_TICKS_PER_SLOT))
{
@ -147,7 +147,7 @@ impl FetchStage {
vote_sender: &PacketBatchSender,
forward_sender: &PacketBatchSender,
forward_receiver: PacketBatchReceiver,
poh_recorder: &Arc<Mutex<PohRecorder>>,
poh_recorder: &Arc<RwLock<PohRecorder>>,
coalesce_ms: u64,
in_vote_only_mode: Option<Arc<AtomicBool>>,
) -> Self {

View File

@ -71,7 +71,7 @@ use {
result,
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex, RwLock,
Arc, RwLock,
},
thread::{self, Builder, JoinHandle},
time::{Duration, Instant},
@ -360,7 +360,7 @@ impl ReplayStage {
cluster_info: Arc<ClusterInfo>,
ledger_signal_receiver: Receiver<bool>,
duplicate_slots_receiver: DuplicateSlotReceiver,
poh_recorder: Arc<Mutex<PohRecorder>>,
poh_recorder: Arc<RwLock<PohRecorder>>,
maybe_process_blockstore: Option<ProcessBlockStore>,
vote_tracker: Arc<VoteTracker>,
cluster_slots: Arc<ClusterSlots>,
@ -476,7 +476,7 @@ impl ReplayStage {
);
generate_new_bank_forks_time.stop();
let mut tpu_has_bank = poh_recorder.lock().unwrap().has_bank();
let mut tpu_has_bank = poh_recorder.read().unwrap().has_bank();
let mut replay_active_banks_time = Measure::start("replay_active_banks_time");
let mut ancestors = bank_forks.read().unwrap().ancestors();
@ -833,7 +833,7 @@ impl ReplayStage {
let mut start_leader_time = Measure::start("start_leader_time");
let mut dump_then_repair_correct_slots_time = Measure::start("dump_then_repair_correct_slots_time");
// Used for correctness check
let poh_bank = poh_recorder.lock().unwrap().bank();
let poh_bank = poh_recorder.read().unwrap().bank();
// Dump any duplicate slots that have been confirmed by the network in
// anticipation of repairing the confirmed version of the slot.
//
@ -868,7 +868,7 @@ impl ReplayStage {
transaction_status_sender.is_some(),
);
let poh_bank = poh_recorder.lock().unwrap().bank();
let poh_bank = poh_recorder.read().unwrap().bank();
if let Some(bank) = poh_bank {
Self::log_leader_change(
&my_pubkey,
@ -1000,11 +1000,11 @@ impl ReplayStage {
}
fn retransmit_latest_unpropagated_leader_slot(
poh_recorder: &Arc<Mutex<PohRecorder>>,
poh_recorder: &Arc<RwLock<PohRecorder>>,
retransmit_slots_sender: &RetransmitSlotsSender,
progress: &mut ProgressMap,
) {
let start_slot = poh_recorder.lock().unwrap().start_slot();
let start_slot = poh_recorder.read().unwrap().start_slot();
if let (false, Some(latest_leader_slot)) =
progress.get_leader_propagation_slot_must_exist(start_slot)
@ -1545,7 +1545,7 @@ impl ReplayStage {
fn maybe_start_leader(
my_pubkey: &Pubkey,
bank_forks: &Arc<RwLock<BankForks>>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
poh_recorder: &Arc<RwLock<PohRecorder>>,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
rpc_subscriptions: &Arc<RpcSubscriptions>,
progress_map: &mut ProgressMap,
@ -1554,12 +1554,12 @@ impl ReplayStage {
has_new_vote_been_rooted: bool,
track_transaction_indexes: bool,
) {
// all the individual calls to poh_recorder.lock() are designed to
// all the individual calls to poh_recorder.read() are designed to
// increase granularity, decrease contention
assert!(!poh_recorder.lock().unwrap().has_bank());
assert!(!poh_recorder.read().unwrap().has_bank());
let (poh_slot, parent_slot) = match poh_recorder.lock().unwrap().reached_leader_slot() {
let (poh_slot, parent_slot) = match poh_recorder.read().unwrap().reached_leader_slot() {
PohLeaderStatus::Reached {
poh_slot,
parent_slot,
@ -1674,7 +1674,7 @@ impl ReplayStage {
let tpu_bank = bank_forks.write().unwrap().insert(tpu_bank);
poh_recorder
.lock()
.write()
.unwrap()
.set_bank(&tpu_bank, track_transaction_indexes);
} else {
@ -2150,7 +2150,7 @@ impl ReplayStage {
my_pubkey: &Pubkey,
blockstore: &Blockstore,
bank: &Arc<Bank>,
poh_recorder: &Mutex<PohRecorder>,
poh_recorder: &RwLock<PohRecorder>,
leader_schedule_cache: &LeaderScheduleCache,
) {
let next_leader_slot = leader_schedule_cache.next_leader_slot(
@ -2161,7 +2161,7 @@ impl ReplayStage {
GRACE_TICKS_FACTOR * MAX_GRACE_SLOTS,
);
poh_recorder
.lock()
.write()
.unwrap()
.reset(bank.clone(), next_leader_slot);
@ -3271,7 +3271,7 @@ pub(crate) mod tests {
my_pubkey: Pubkey,
cluster_info: ClusterInfo,
leader_schedule_cache: Arc<LeaderScheduleCache>,
poh_recorder: Mutex<PohRecorder>,
poh_recorder: RwLock<PohRecorder>,
tower: Tower,
rpc_subscriptions: Arc<RpcSubscriptions>,
pub vote_simulator: VoteSimulator,
@ -3322,7 +3322,7 @@ pub(crate) mod tests {
// PohRecorder
let working_bank = bank_forks.read().unwrap().working_bank();
let poh_recorder = Mutex::new(
let poh_recorder = RwLock::new(
PohRecorder::new(
working_bank.tick_height(),
working_bank.last_blockhash(),

View File

@ -36,7 +36,7 @@ use {
},
std::{
net::UdpSocket,
sync::{atomic::AtomicBool, Arc, Mutex, RwLock},
sync::{atomic::AtomicBool, Arc, RwLock},
thread,
},
};
@ -73,7 +73,7 @@ impl Tpu {
#[allow(clippy::too_many_arguments)]
pub fn new(
cluster_info: &Arc<ClusterInfo>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
poh_recorder: &Arc<RwLock<PohRecorder>>,
entry_receiver: Receiver<WorkingBankEntry>,
retransmit_slots_receiver: RetransmitSlotsReceiver,
sockets: TpuSockets,

View File

@ -52,7 +52,7 @@ use {
std::{
collections::HashSet,
net::UdpSocket,
sync::{atomic::AtomicBool, Arc, Mutex, RwLock},
sync::{atomic::AtomicBool, Arc, RwLock},
thread,
},
};
@ -105,7 +105,7 @@ impl Tvu {
blockstore: Arc<Blockstore>,
ledger_signal_receiver: Receiver<bool>,
rpc_subscriptions: &Arc<RpcSubscriptions>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
poh_recorder: &Arc<RwLock<PohRecorder>>,
maybe_process_block_store: Option<ProcessBlockStore>,
tower_storage: Arc<dyn TowerStorage>,
leader_schedule_cache: &Arc<LeaderScheduleCache>,

View File

@ -106,7 +106,7 @@ use {
path::{Path, PathBuf},
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc, Mutex, RwLock,
Arc, RwLock,
},
thread::{sleep, Builder, JoinHandle},
time::{Duration, Instant},
@ -341,7 +341,7 @@ pub struct Validator {
serve_repair_service: ServeRepairService,
completed_data_sets_service: CompletedDataSetsService,
snapshot_packager_service: Option<SnapshotPackagerService>,
poh_recorder: Arc<Mutex<PohRecorder>>,
poh_recorder: Arc<RwLock<PohRecorder>>,
poh_service: PohService,
tpu: Tpu,
tvu: Tvu,
@ -755,7 +755,7 @@ impl Validator {
exit.clone(),
)
};
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let poh_recorder = Arc::new(RwLock::new(poh_recorder));
let use_quic = UseQUIC::new(use_quic).expect("Failed to initialize QUIC flags");
let connection_cache = Arc::new(ConnectionCache::new(use_quic, tpu_connection_pool_size));

View File

@ -7,7 +7,7 @@ use {
solana_runtime::bank_forks::BankForks,
solana_sdk::{clock::Slot, transaction::Transaction},
std::{
sync::{Arc, Mutex, RwLock},
sync::{Arc, RwLock},
thread::{self, Builder, JoinHandle},
},
};
@ -41,7 +41,7 @@ impl VotingService {
pub fn new(
vote_receiver: Receiver<VoteOp>,
cluster_info: Arc<ClusterInfo>,
poh_recorder: Arc<Mutex<PohRecorder>>,
poh_recorder: Arc<RwLock<PohRecorder>>,
tower_storage: Arc<dyn TowerStorage>,
bank_forks: Arc<RwLock<BankForks>>,
) -> Self {
@ -66,7 +66,7 @@ impl VotingService {
pub fn handle_vote(
cluster_info: &ClusterInfo,
poh_recorder: &Mutex<PohRecorder>,
poh_recorder: &RwLock<PohRecorder>,
tower_storage: &dyn TowerStorage,
vote_op: VoteOp,
send_to_tpu_vote_port: bool,

View File

@ -9,7 +9,7 @@ use {
std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
Arc, RwLock,
},
thread::{self, sleep, Builder, JoinHandle},
time::Duration,
@ -28,7 +28,7 @@ impl WarmQuicCacheService {
pub fn new(
connection_cache: Arc<ConnectionCache>,
cluster_info: Arc<ClusterInfo>,
poh_recorder: Arc<Mutex<PohRecorder>>,
poh_recorder: Arc<RwLock<PohRecorder>>,
exit: Arc<AtomicBool>,
) -> Self {
let thread_hdl = Builder::new()
@ -38,7 +38,7 @@ impl WarmQuicCacheService {
let mut maybe_last_leader = None;
while !exit.load(Ordering::Relaxed) {
let leader_pubkey = poh_recorder
.lock()
.read()
.unwrap()
.leader_after_n_slots((CACHE_OFFSET_SLOT + slot_jitter) as u64);
if let Some(leader_pubkey) = leader_pubkey {

View File

@ -32,7 +32,7 @@ use {
cmp,
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
Arc, Mutex, RwLock,
},
time::{Duration, Instant},
},
@ -949,7 +949,7 @@ pub fn create_test_recorder(
leader_schedule_cache: Option<Arc<LeaderScheduleCache>>,
) -> (
Arc<AtomicBool>,
Arc<Mutex<PohRecorder>>,
Arc<RwLock<PohRecorder>>,
PohService,
Receiver<WorkingBankEntry>,
) {
@ -973,7 +973,7 @@ pub fn create_test_recorder(
);
poh_recorder.set_bank(bank, false);
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let poh_recorder = Arc::new(RwLock::new(poh_recorder));
let poh_service = PohService::new(
poh_recorder.clone(),
&poh_config,

View File

@ -10,7 +10,7 @@ use {
std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
Arc, Mutex, RwLock,
},
thread::{self, Builder, JoinHandle},
time::{Duration, Instant},
@ -95,7 +95,7 @@ impl PohTiming {
impl PohService {
pub fn new(
poh_recorder: Arc<Mutex<PohRecorder>>,
poh_recorder: Arc<RwLock<PohRecorder>>,
poh_config: &Arc<PohConfig>,
poh_exit: &Arc<AtomicBool>,
ticks_per_slot: u64,
@ -163,7 +163,7 @@ impl PohService {
}
fn sleepy_tick_producer(
poh_recorder: Arc<Mutex<PohRecorder>>,
poh_recorder: Arc<RwLock<PohRecorder>>,
poh_config: &PohConfig,
poh_exit: &AtomicBool,
record_receiver: Receiver<Record>,
@ -180,13 +180,13 @@ impl PohService {
);
if remaining_tick_time.is_zero() {
last_tick = Instant::now();
poh_recorder.lock().unwrap().tick();
poh_recorder.write().unwrap().tick();
}
}
}
pub fn read_record_receiver_and_process(
poh_recorder: &Arc<Mutex<PohRecorder>>,
poh_recorder: &Arc<RwLock<PohRecorder>>,
record_receiver: &Receiver<Record>,
timeout: Duration,
) {
@ -194,7 +194,7 @@ impl PohService {
if let Ok(record) = record {
if record
.sender
.send(poh_recorder.lock().unwrap().record(
.send(poh_recorder.write().unwrap().record(
record.slot,
record.mixin,
record.transactions,
@ -207,7 +207,7 @@ impl PohService {
}
fn short_lived_sleepy_tick_producer(
poh_recorder: Arc<Mutex<PohRecorder>>,
poh_recorder: Arc<RwLock<PohRecorder>>,
poh_config: &PohConfig,
poh_exit: &AtomicBool,
record_receiver: Receiver<Record>,
@ -227,7 +227,7 @@ impl PohService {
);
if remaining_tick_time.is_zero() {
last_tick = Instant::now();
poh_recorder.lock().unwrap().tick();
poh_recorder.write().unwrap().tick();
elapsed_ticks += 1;
}
if poh_exit.load(Ordering::Relaxed) && !warned {
@ -240,7 +240,7 @@ impl PohService {
// returns true if we need to tick
fn record_or_hash(
next_record: &mut Option<Record>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
poh_recorder: &Arc<RwLock<PohRecorder>>,
timing: &mut PohTiming,
record_receiver: &Receiver<Record>,
hashes_per_batch: u64,
@ -252,7 +252,7 @@ impl PohService {
// received message to record
// so, record for as long as we have queued up record requests
let mut lock_time = Measure::start("lock");
let mut poh_recorder_l = poh_recorder.lock().unwrap();
let mut poh_recorder_l = poh_recorder.write().unwrap();
lock_time.stop();
timing.total_lock_time_ns += lock_time.as_ns();
let mut record_time = Measure::start("record");
@ -332,14 +332,14 @@ impl PohService {
}
fn tick_producer(
poh_recorder: Arc<Mutex<PohRecorder>>,
poh_recorder: Arc<RwLock<PohRecorder>>,
poh_exit: &AtomicBool,
ticks_per_slot: u64,
hashes_per_batch: u64,
record_receiver: Receiver<Record>,
target_ns_per_tick: u64,
) {
let poh = poh_recorder.lock().unwrap().poh.clone();
let poh = poh_recorder.read().unwrap().poh.clone();
let mut timing = PohTiming::new();
let mut next_record = None;
loop {
@ -356,7 +356,7 @@ impl PohService {
// Lock PohRecorder only for the final hash. record_or_hash will lock PohRecorder for record calls but not for hashing.
{
let mut lock_time = Measure::start("lock");
let mut poh_recorder_l = poh_recorder.lock().unwrap();
let mut poh_recorder_l = poh_recorder.write().unwrap();
lock_time.stop();
timing.total_lock_time_ns += lock_time.as_ns();
let mut tick_time = Measure::start("tick");
@ -436,7 +436,7 @@ mod tests {
&poh_config,
exit.clone(),
);
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let poh_recorder = Arc::new(RwLock::new(poh_recorder));
let ticks_per_slot = bank.ticks_per_slot();
let bank_slot = bank.slot();
@ -462,7 +462,7 @@ mod tests {
loop {
// send some data
let mut time = Measure::start("record");
let _ = poh_recorder.lock().unwrap().record(
let _ = poh_recorder.write().unwrap().record(
bank_slot,
h1,
vec![tx.clone()],
@ -500,7 +500,7 @@ mod tests {
hashes_per_batch,
record_receiver,
);
poh_recorder.lock().unwrap().set_bank(&bank, false);
poh_recorder.write().unwrap().set_bank(&bank, false);
// get some events
let mut hashes = 0;

View File

@ -6,19 +6,19 @@ use {
std::{
collections::HashMap,
net::SocketAddr,
sync::{Arc, Mutex},
sync::{Arc, RwLock},
},
};
#[derive(Clone)]
pub struct ClusterTpuInfo {
cluster_info: Arc<ClusterInfo>,
poh_recorder: Arc<Mutex<PohRecorder>>,
poh_recorder: Arc<RwLock<PohRecorder>>,
recent_peers: HashMap<Pubkey, SocketAddr>,
}
impl ClusterTpuInfo {
pub fn new(cluster_info: Arc<ClusterInfo>, poh_recorder: Arc<Mutex<PohRecorder>>) -> Self {
pub fn new(cluster_info: Arc<ClusterInfo>, poh_recorder: Arc<RwLock<PohRecorder>>) -> Self {
Self {
cluster_info,
poh_recorder,
@ -38,7 +38,7 @@ impl TpuInfo for ClusterTpuInfo {
}
fn get_leader_tpus(&self, max_count: u64) -> Vec<&SocketAddr> {
let recorder = self.poh_recorder.lock().unwrap();
let recorder = self.poh_recorder.read().unwrap();
let leaders: Vec<_> = (0..max_count)
.filter_map(|i| recorder.leader_after_n_slots(i * NUM_CONSECUTIVE_LEADER_SLOTS))
.collect();
@ -141,7 +141,7 @@ mod test {
.collect();
let leader_info = ClusterTpuInfo {
cluster_info,
poh_recorder: Arc::new(Mutex::new(poh_recorder)),
poh_recorder: Arc::new(RwLock::new(poh_recorder)),
recent_peers: recent_peers.clone(),
};

View File

@ -45,7 +45,7 @@ use {
path::{Path, PathBuf},
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc, Mutex, RwLock,
Arc, RwLock,
},
thread::{self, Builder, JoinHandle},
},
@ -342,7 +342,7 @@ impl JsonRpcService {
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
blockstore: Arc<Blockstore>,
cluster_info: Arc<ClusterInfo>,
poh_recorder: Option<Arc<Mutex<PohRecorder>>>,
poh_recorder: Option<Arc<RwLock<PohRecorder>>>,
genesis_hash: Hash,
ledger_path: &Path,
validator_exit: Arc<RwLock<Exit>>,