diff --git a/turbine/src/broadcast_stage.rs b/turbine/src/broadcast_stage.rs index d08c31444..116f48885 100644 --- a/turbine/src/broadcast_stage.rs +++ b/turbine/src/broadcast_stage.rs @@ -168,12 +168,12 @@ trait BroadcastRun { ) -> Result<()>; fn transmit( &mut self, - receiver: &Mutex, + receiver: &TransmitReceiver, cluster_info: &ClusterInfo, sock: &UdpSocket, bank_forks: &RwLock, ) -> Result<()>; - fn record(&mut self, receiver: &Mutex, blockstore: &Blockstore) -> Result<()>; + fn record(&mut self, receiver: &RecordReceiver, blockstore: &Blockstore) -> Result<()>; } // Implement a destructor for the BroadcastStage thread to signal it exited @@ -291,7 +291,6 @@ impl BroadcastStage { .unwrap() }; let mut thread_hdls = vec![thread_hdl]; - let socket_receiver = Arc::new(Mutex::new(socket_receiver)); thread_hdls.extend(socks.into_iter().map(|sock| { let socket_receiver = socket_receiver.clone(); let mut bs_transmit = broadcast_stage_run.clone(); @@ -309,7 +308,6 @@ impl BroadcastStage { .spawn(run_transmit) .unwrap() })); - let blockstore_receiver = Arc::new(Mutex::new(blockstore_receiver)); thread_hdls.extend( repeat_with(|| { let blockstore_receiver = blockstore_receiver.clone(); diff --git a/turbine/src/broadcast_stage/broadcast_duplicates_run.rs b/turbine/src/broadcast_stage/broadcast_duplicates_run.rs index 08eec8983..61f3a9b0f 100644 --- a/turbine/src/broadcast_stage/broadcast_duplicates_run.rs +++ b/turbine/src/broadcast_stage/broadcast_duplicates_run.rs @@ -261,12 +261,12 @@ impl BroadcastRun for BroadcastDuplicatesRun { fn transmit( &mut self, - receiver: &Mutex, + receiver: &TransmitReceiver, cluster_info: &ClusterInfo, sock: &UdpSocket, bank_forks: &RwLock, ) -> Result<()> { - let (shreds, _) = receiver.lock().unwrap().recv()?; + let (shreds, _) = receiver.recv()?; if shreds.is_empty() { return Ok(()); } @@ -356,8 +356,8 @@ impl BroadcastRun for BroadcastDuplicatesRun { Ok(()) } - fn record(&mut self, receiver: &Mutex, blockstore: &Blockstore) -> Result<()> { - let (all_shreds, _) = receiver.lock().unwrap().recv()?; + fn record(&mut self, receiver: &RecordReceiver, blockstore: &Blockstore) -> Result<()> { + let (all_shreds, _) = receiver.recv()?; blockstore .insert_shreds(all_shreds.to_vec(), None, true) .expect("Failed to insert shreds in blockstore"); diff --git a/turbine/src/broadcast_stage/broadcast_fake_shreds_run.rs b/turbine/src/broadcast_stage/broadcast_fake_shreds_run.rs index 4269e0186..05c48de50 100644 --- a/turbine/src/broadcast_stage/broadcast_fake_shreds_run.rs +++ b/turbine/src/broadcast_stage/broadcast_fake_shreds_run.rs @@ -128,12 +128,12 @@ impl BroadcastRun for BroadcastFakeShredsRun { } fn transmit( &mut self, - receiver: &Mutex, + receiver: &TransmitReceiver, cluster_info: &ClusterInfo, sock: &UdpSocket, _bank_forks: &RwLock, ) -> Result<()> { - for (data_shreds, batch_info) in receiver.lock().unwrap().iter() { + for (data_shreds, batch_info) in receiver { let fake = batch_info.is_some(); let peers = cluster_info.tvu_peers(); peers.iter().enumerate().for_each(|(i, peer)| { @@ -149,8 +149,8 @@ impl BroadcastRun for BroadcastFakeShredsRun { } Ok(()) } - fn record(&mut self, receiver: &Mutex, blockstore: &Blockstore) -> Result<()> { - for (data_shreds, _) in receiver.lock().unwrap().iter() { + fn record(&mut self, receiver: &RecordReceiver, blockstore: &Blockstore) -> Result<()> { + for (data_shreds, _) in receiver { blockstore.insert_shreds(data_shreds.to_vec(), None, true)?; } Ok(()) diff --git a/turbine/src/broadcast_stage/fail_entry_verification_broadcast_run.rs b/turbine/src/broadcast_stage/fail_entry_verification_broadcast_run.rs index 4e97cff12..85500b1dd 100644 --- a/turbine/src/broadcast_stage/fail_entry_verification_broadcast_run.rs +++ b/turbine/src/broadcast_stage/fail_entry_verification_broadcast_run.rs @@ -160,12 +160,12 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { } fn transmit( &mut self, - receiver: &Mutex, + receiver: &TransmitReceiver, cluster_info: &ClusterInfo, sock: &UdpSocket, bank_forks: &RwLock, ) -> Result<()> { - let (shreds, _) = receiver.lock().unwrap().recv()?; + let (shreds, _) = receiver.recv()?; broadcast_shreds( sock, &shreds, @@ -178,8 +178,8 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { cluster_info.socket_addr_space(), ) } - fn record(&mut self, receiver: &Mutex, blockstore: &Blockstore) -> Result<()> { - let (all_shreds, _) = receiver.lock().unwrap().recv()?; + fn record(&mut self, receiver: &RecordReceiver, blockstore: &Blockstore) -> Result<()> { + let (all_shreds, _) = receiver.recv()?; blockstore .insert_shreds(all_shreds.to_vec(), None, true) .expect("Failed to insert shreds in blockstore"); diff --git a/turbine/src/broadcast_stage/standard_broadcast_run.rs b/turbine/src/broadcast_stage/standard_broadcast_run.rs index 3e51609e4..34dd8c36b 100644 --- a/turbine/src/broadcast_stage/standard_broadcast_run.rs +++ b/turbine/src/broadcast_stage/standard_broadcast_run.rs @@ -199,9 +199,6 @@ impl StandardBroadcastRun { let (bsend, brecv) = unbounded(); let (ssend, srecv) = unbounded(); self.process_receive_results(keypair, blockstore, &ssend, &bsend, receive_results)?; - let srecv = Arc::new(Mutex::new(srecv)); - let brecv = Arc::new(Mutex::new(brecv)); - //data let _ = self.transmit(&srecv, cluster_info, sock, bank_forks); let _ = self.record(&brecv, blockstore); @@ -486,16 +483,16 @@ impl BroadcastRun for StandardBroadcastRun { } fn transmit( &mut self, - receiver: &Mutex, + receiver: &TransmitReceiver, cluster_info: &ClusterInfo, sock: &UdpSocket, bank_forks: &RwLock, ) -> Result<()> { - let (shreds, batch_info) = receiver.lock().unwrap().recv()?; + let (shreds, batch_info) = receiver.recv()?; self.broadcast(sock, cluster_info, shreds, batch_info, bank_forks) } - fn record(&mut self, receiver: &Mutex, blockstore: &Blockstore) -> Result<()> { - let (shreds, slot_start_ts) = receiver.lock().unwrap().recv()?; + fn record(&mut self, receiver: &RecordReceiver, blockstore: &Blockstore) -> Result<()> { + let (shreds, slot_start_ts) = receiver.recv()?; self.insert(blockstore, shreds, slot_start_ts); Ok(()) }