removes redundant Mutex wrappers around Receiver channels (#32387)

Receiver channels are thread-safe and do not need a Mutex wrappers.
This commit is contained in:
behzad nouri 2023-07-05 22:19:59 +00:00 committed by GitHub
parent 39841cc35a
commit 6068676c2a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 18 additions and 23 deletions

View File

@ -168,12 +168,12 @@ trait BroadcastRun {
) -> Result<()>;
fn transmit(
&mut self,
receiver: &Mutex<TransmitReceiver>,
receiver: &TransmitReceiver,
cluster_info: &ClusterInfo,
sock: &UdpSocket,
bank_forks: &RwLock<BankForks>,
) -> Result<()>;
fn record(&mut self, receiver: &Mutex<RecordReceiver>, blockstore: &Blockstore) -> Result<()>;
fn record(&mut self, receiver: &RecordReceiver, blockstore: &Blockstore) -> Result<()>;
}
// Implement a destructor for the BroadcastStage thread to signal it exited
@ -291,7 +291,6 @@ impl BroadcastStage {
.unwrap()
};
let mut thread_hdls = vec![thread_hdl];
let socket_receiver = Arc::new(Mutex::new(socket_receiver));
thread_hdls.extend(socks.into_iter().map(|sock| {
let socket_receiver = socket_receiver.clone();
let mut bs_transmit = broadcast_stage_run.clone();
@ -309,7 +308,6 @@ impl BroadcastStage {
.spawn(run_transmit)
.unwrap()
}));
let blockstore_receiver = Arc::new(Mutex::new(blockstore_receiver));
thread_hdls.extend(
repeat_with(|| {
let blockstore_receiver = blockstore_receiver.clone();

View File

@ -261,12 +261,12 @@ impl BroadcastRun for BroadcastDuplicatesRun {
fn transmit(
&mut self,
receiver: &Mutex<TransmitReceiver>,
receiver: &TransmitReceiver,
cluster_info: &ClusterInfo,
sock: &UdpSocket,
bank_forks: &RwLock<BankForks>,
) -> Result<()> {
let (shreds, _) = receiver.lock().unwrap().recv()?;
let (shreds, _) = receiver.recv()?;
if shreds.is_empty() {
return Ok(());
}
@ -356,8 +356,8 @@ impl BroadcastRun for BroadcastDuplicatesRun {
Ok(())
}
fn record(&mut self, receiver: &Mutex<RecordReceiver>, blockstore: &Blockstore) -> Result<()> {
let (all_shreds, _) = receiver.lock().unwrap().recv()?;
fn record(&mut self, receiver: &RecordReceiver, blockstore: &Blockstore) -> Result<()> {
let (all_shreds, _) = receiver.recv()?;
blockstore
.insert_shreds(all_shreds.to_vec(), None, true)
.expect("Failed to insert shreds in blockstore");

View File

@ -128,12 +128,12 @@ impl BroadcastRun for BroadcastFakeShredsRun {
}
fn transmit(
&mut self,
receiver: &Mutex<TransmitReceiver>,
receiver: &TransmitReceiver,
cluster_info: &ClusterInfo,
sock: &UdpSocket,
_bank_forks: &RwLock<BankForks>,
) -> Result<()> {
for (data_shreds, batch_info) in receiver.lock().unwrap().iter() {
for (data_shreds, batch_info) in receiver {
let fake = batch_info.is_some();
let peers = cluster_info.tvu_peers();
peers.iter().enumerate().for_each(|(i, peer)| {
@ -149,8 +149,8 @@ impl BroadcastRun for BroadcastFakeShredsRun {
}
Ok(())
}
fn record(&mut self, receiver: &Mutex<RecordReceiver>, blockstore: &Blockstore) -> Result<()> {
for (data_shreds, _) in receiver.lock().unwrap().iter() {
fn record(&mut self, receiver: &RecordReceiver, blockstore: &Blockstore) -> Result<()> {
for (data_shreds, _) in receiver {
blockstore.insert_shreds(data_shreds.to_vec(), None, true)?;
}
Ok(())

View File

@ -160,12 +160,12 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
}
fn transmit(
&mut self,
receiver: &Mutex<TransmitReceiver>,
receiver: &TransmitReceiver,
cluster_info: &ClusterInfo,
sock: &UdpSocket,
bank_forks: &RwLock<BankForks>,
) -> Result<()> {
let (shreds, _) = receiver.lock().unwrap().recv()?;
let (shreds, _) = receiver.recv()?;
broadcast_shreds(
sock,
&shreds,
@ -178,8 +178,8 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
cluster_info.socket_addr_space(),
)
}
fn record(&mut self, receiver: &Mutex<RecordReceiver>, blockstore: &Blockstore) -> Result<()> {
let (all_shreds, _) = receiver.lock().unwrap().recv()?;
fn record(&mut self, receiver: &RecordReceiver, blockstore: &Blockstore) -> Result<()> {
let (all_shreds, _) = receiver.recv()?;
blockstore
.insert_shreds(all_shreds.to_vec(), None, true)
.expect("Failed to insert shreds in blockstore");

View File

@ -199,9 +199,6 @@ impl StandardBroadcastRun {
let (bsend, brecv) = unbounded();
let (ssend, srecv) = unbounded();
self.process_receive_results(keypair, blockstore, &ssend, &bsend, receive_results)?;
let srecv = Arc::new(Mutex::new(srecv));
let brecv = Arc::new(Mutex::new(brecv));
//data
let _ = self.transmit(&srecv, cluster_info, sock, bank_forks);
let _ = self.record(&brecv, blockstore);
@ -486,16 +483,16 @@ impl BroadcastRun for StandardBroadcastRun {
}
fn transmit(
&mut self,
receiver: &Mutex<TransmitReceiver>,
receiver: &TransmitReceiver,
cluster_info: &ClusterInfo,
sock: &UdpSocket,
bank_forks: &RwLock<BankForks>,
) -> Result<()> {
let (shreds, batch_info) = receiver.lock().unwrap().recv()?;
let (shreds, batch_info) = receiver.recv()?;
self.broadcast(sock, cluster_info, shreds, batch_info, bank_forks)
}
fn record(&mut self, receiver: &Mutex<RecordReceiver>, blockstore: &Blockstore) -> Result<()> {
let (shreds, slot_start_ts) = receiver.lock().unwrap().recv()?;
fn record(&mut self, receiver: &RecordReceiver, blockstore: &Blockstore) -> Result<()> {
let (shreds, slot_start_ts) = receiver.recv()?;
self.insert(blockstore, shreds, slot_start_ts);
Ok(())
}